Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
3b464b7
Set preferred locations for reduce tasks
shivaram Feb 12, 2015
34d0283
Fix style issues
shivaram Feb 12, 2015
774751b
Fix bug introduced by line length adjustment
shivaram Feb 13, 2015
bc4dfd6
Merge branch 'master' of https://github.com/apache/spark into reduce-…
shivaram Feb 18, 2015
0171d3c
Merge branch 'master' of https://github.com/apache/spark into reduce-…
shivaram Mar 3, 2015
5093aea
Merge branch 'master' of https://github.com/apache/spark into reduce-…
shivaram Mar 8, 2015
df14cee
Merge branch 'master' of https://github.com/apache/spark into reduce-…
shivaram Mar 21, 2015
ad7cb53
Merge branch 'master' of https://github.com/apache/spark into reduce-…
shivaram Jun 4, 2015
e7d5449
Fix merge issues
shivaram Jun 4, 2015
0df3180
Address code review comments
shivaram Jun 4, 2015
8e31266
Fix style
shivaram Jun 4, 2015
9d5831a
Address some more comments
shivaram Jun 5, 2015
6cfae98
Filter out zero blocks, rename variables
shivaram Jun 5, 2015
e5d56bd
Add flag to turn off locality for shuffle deps
shivaram Jun 5, 2015
77ce7d8
Merge branch 'master' of https://github.com/apache/spark into reduce-…
shivaram Jun 6, 2015
1090b58
Change flag name
shivaram Jun 6, 2015
68bc29e
Fix line length
shivaram Jun 6, 2015
f5be578
Use fraction of map outputs to determine locations
shivaram Jun 9, 2015
897a914
Remove unused hash map
shivaram Jun 9, 2015
2ef2d39
Address code review comments
shivaram Jun 9, 2015
492e25e
Remove unused import
shivaram Jun 9, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io._
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import scala.collection.mutable.{HashSet, Map}
import scala.collection.mutable.{HashSet, Map, HashMap}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import ordering

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

import scala.collection.JavaConversions._
import scala.reflect.ClassTag

Expand Down Expand Up @@ -232,6 +232,12 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]()

// For each shuffleId we also maintain a Map from reducerId -> (location, size)
// Lazily populated whenever the statuses are requested from DAGScheduler
private val statusByReducer =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you rename this to shuffleIdToReduceLocations? With the current name, I expected the main map to be keyed on reduce id.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

new TimeStampedHashMap[Int, HashMap[Int, Array[(BlockManagerId, Long)]]]()


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra newline

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

// For cleaning up TimeStampedHashMaps
private val metadataCleaner =
new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup, conf)
Expand Down Expand Up @@ -277,13 +283,40 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
override def unregisterShuffle(shuffleId: Int) {
mapStatuses.remove(shuffleId)
cachedSerializedStatuses.remove(shuffleId)
statusByReducer.remove(shuffleId)
}

/** Check if the given shuffle is being tracked */
def containsShuffle(shuffleId: Int): Boolean = {
cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId)
}

// Return the list of locations and blockSizes for each reducer.
// The map is keyed by reducerId and for each reducer the value contains the array
// of (location, size) of map outputs.
//
// This method is not thread-safe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the Scaladoc syntax for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now using scaladoc syntax

def getStatusByReducer(
shuffleId: Int,
numReducers: Int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, take in a parameter here for maxNumberLocations, and then do the filtering for top 5 on line 309-ish when you create the status method (this avoids some of the scalability issues of creating a gigantic map)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, what about changing this to something like "getMapLocationsWithBiggestOutputs", and having it also accept the reduceId (and then return Option[Array[(BlockManagerId, Long)]], since the caller always just takes one element out of the map? And then you can just add a comment internally saying that you populate the whole map for use later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved the takeOrdered to MapOutputTracker now. Unfortunately some of the savings in constructing the array can't be done as we need the groupBy here (for the comment below).
Also I'm hoping the groupBy is not expensive if we have say a 1000 elements in the array.

@JoshRosen let me know if there is a better way to do this.

: Option[Map[Int, Array[(BlockManagerId, Long)]]] = {
if (!statusByReducer.contains(shuffleId) && mapStatuses.contains(shuffleId)) {
val statuses = mapStatuses(shuffleId)
if (statuses.length > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

statuses.nonEmpty ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

statusByReducer(shuffleId) = new HashMap[Int, Array[(BlockManagerId, Long)]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also is it possible to combine entries for the same block manager here? I think that, right now, if multiple maps are at the same block manager, they won't be aggregated when thinking about which block manager ID has the most output, so if you have

BM 1: 2 bytes
BM 1: 2 bytes
BM 2: 3 bytes,

right now BM2 will be considered the biggest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point. I didn't take this into account before. Now added a test in MapOutputTrackerSuite to test this.

var r = 0
while (r < numReducers) {
val locs = statuses.map { s =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If perf. is important here, why not write some java-style code that does an aggregate-by-key using a mutable hashmap?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think we should just go with a straightforward imperative code here. I don't think it will be significantly longer and can't be slow by accident (it'll also create less garbage).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Its the same 3 lines of code imperatively as well. And create a bit less garbage I reuse the hashmap across reducers now. Let me know if this looks okay

(s.location, s.getSizeForBlock(r))
}
statusByReducer(shuffleId) += (r -> locs)
r = r + 1
}
}
}
statusByReducer.get(shuffleId)
}

def incrementEpoch() {
epochLock.synchronized {
epoch += 1
Expand Down Expand Up @@ -331,6 +364,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
private def cleanup(cleanupTime: Long) {
mapStatuses.clearOldValues(cleanupTime)
cachedSerializedStatuses.clearOldValues(cleanupTime)
statusByReducer.clearOldValues(cleanupTime)
}
}

Expand Down
26 changes: 26 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util._
import org.apache.spark.util.collection.{Utils => CollectionUtils}
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat

/**
Expand Down Expand Up @@ -137,6 +138,14 @@ class DAGScheduler(
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)

// Number of map, reduce tasks above which we do not assign preferred locations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment here saying that we limit the size because of scalability issues with sorting the best locations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// based on map output sizes.
private val SHUFFLE_PREF_MAP_THRESHOLD = 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be private[this] instead, but not a big deal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference? (Have always thought they were the same!)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
private val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000
// Number of preferred locations to use for reducer tasks
private[scheduler] val NUM_REDUCER_PREF_LOCS = 5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more comment request here: can you add one sentence explaining this? something like "making this smaller will focus on the locations where the most data can be read locally, but may lead to lower scheduling efficiency or the delay scheduling timers expiring if all of those locations are busy"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added now


// Called by TaskScheduler to report task's starting.
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventProcessLoop.post(BeginEvent(task, taskInfo))
Expand Down Expand Up @@ -1395,6 +1404,23 @@ class DAGScheduler(
return locs
}
}
case s: ShuffleDependency[_, _, _] =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a high-level comment here explaining what this is doing? Something like "For reduce tasks, return the 5 locations with the largest map outputs as preferred locations"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added -- Also I moved the comment for narrow dependencies close to that.

if (rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
// Assign preferred locations for reducers by looking at map output location and sizes
val mapStatuses = mapOutputTracker.getStatusByReducer(s.shuffleId, rdd.partitions.size)
mapStatuses.map { status =>
// Get the map output locations for this reducer
if (status.contains(partition)) {
// Select first few locations as preferred locations for the reducer
val topLocs = CollectionUtils.takeOrdered(
status(partition).iterator, NUM_REDUCER_PREF_LOCS)(
Ordering.by[(BlockManagerId, Long), Long](_._2).reverse).toSeq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this Ordering.by is effectively a constant? Maybe we can pull it out of the map here and reuse it for all loop iterations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved this into MapOutputTracker now and I've pulled it out of the loop there

return topLocs.map(_._1).map(loc => TaskLocation(loc.host, loc.executorId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you're returning from a map here? Maybe we can replace this map with a while loop instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now avoided as we return just one reducer's information from the MapOutputTracker. So we have an if(option.nonEmpty) return here now instead of map

}
}
}

case _ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can delete this pattern now, it shouldn't ever occur.

incidentally, this also suggests that Depedendency should be sealed. I just tried making that change, it turned up these other warnings, which actually seem legit:

warn] /Users/irashid/github/spark/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala:136: non-variable type argument Product2[K,Any] in type pattern org.apache.spark.OneToOneDependency[Product2[K,Any]] is unchecked since it is eliminated by erasure
[warn]       case oneToOneDependency: OneToOneDependency[Product2[K, Any]] =>
[warn]                                ^
[warn] /Users/irashid/github/spark/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala:135: match may not be exhaustive.
[warn] It would fail on the following input: NarrowDependency()
[warn]     for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
[warn]                                                      ^
[warn] /Users/irashid/github/spark/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala:109: match may not be exhaustive.
[warn] It would fail on the following input: NarrowDependency()
[warn]       dependencies(depNum) match {
[warn]                   ^
[warn] three warnings found

It seems that code will break if they are ever given a RangeDependency

}
Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,8 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
(Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
// the 2nd ResultTask failed
complete(taskSets(1), Seq(
(Success, 42),
Expand All @@ -501,7 +501,7 @@ class DAGSchedulerSuite
// ask the scheduler to try it again
scheduler.resubmitFailedStages()
// have the 2nd attempt pass
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size))))
// we can see both result blocks now
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
Array("hostA", "hostB"))
Expand All @@ -517,8 +517,8 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
(Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
// The MapOutputTracker should know about both map output locations.
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
Array("hostA", "hostB"))
Expand Down Expand Up @@ -560,18 +560,18 @@ class DAGSchedulerSuite
assert(newEpoch > oldEpoch)
val taskSet = taskSets(0)
// should be ignored for being too old
runEvent(CompletionEvent(
taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
// should work because it's a non-failed host
runEvent(CompletionEvent(
taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo(), null))
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB",
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
// should be ignored for being too old
runEvent(CompletionEvent(
taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
// should work because it's a new epoch
taskSet.tasks(1).epoch = newEpoch
runEvent(CompletionEvent(
taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA",
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
Expand Down Expand Up @@ -800,19 +800,63 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}

test("shuffle with reducer locality") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok last supernit: can you make this name a description of what should happen? "Reduce tasks should be placed locally with map output" and then for the below test "Reduce task locality preferences should only include machine with largest map output" or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// Create an shuffleMapRdd with 1 partition
val shuffleMapRdd = new MyRDD(sc, 1, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1))))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostA")))

// Reducer should run on the same host that map task ran
val reduceTaskSet = taskSets(1)
assertLocations(reduceTaskSet, Seq(Seq("hostA")))
complete(reduceTaskSet, Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty
}

test("reducer locality with different sizes") {
val numMapTasks = scheduler.NUM_REDUCER_PREF_LOCS + 1
// Create an shuffleMapRdd with more partitions
val shuffleMapRdd = new MyRDD(sc, numMapTasks, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))

val statuses = (1 to numMapTasks).map { i =>
(Success, makeMapStatus("host" + i, 1, (10*i).toByte))
}
complete(taskSets(0), statuses)

// Reducer should prefer the last hosts where output size is larger
val hosts = (1 to numMapTasks).map(i => "host" + i).reverse.take(numMapTasks - 1)

val reduceTaskSet = taskSets(1)
assertLocations(reduceTaskSet, Seq(hosts))
complete(reduceTaskSet, Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty
}

/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
*/
private def assertLocations(taskSet: TaskSet, hosts: Seq[Seq[String]]) {
assert(hosts.size === taskSet.tasks.size)
for ((taskLocs, expectedLocs) <- taskSet.tasks.map(_.preferredLocations).zip(hosts)) {
assert(taskLocs.map(_.host) === expectedLocs)
assert(taskLocs.map(_.host).toSet === expectedLocs.toSet)
}
}

private def makeMapStatus(host: String, reduces: Int): MapStatus =
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(2))
private def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus =
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes))

private def makeBlockManagerId(host: String): BlockManagerId =
BlockManagerId("exec-" + host, host, 12345)
Expand Down