Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
3b464b7
Set preferred locations for reduce tasks
shivaram Feb 12, 2015
34d0283
Fix style issues
shivaram Feb 12, 2015
774751b
Fix bug introduced by line length adjustment
shivaram Feb 13, 2015
bc4dfd6
Merge branch 'master' of https://github.com/apache/spark into reduce-…
shivaram Feb 18, 2015
0171d3c
Merge branch 'master' of https://github.com/apache/spark into reduce-…
shivaram Mar 3, 2015
5093aea
Merge branch 'master' of https://github.com/apache/spark into reduce-…
shivaram Mar 8, 2015
df14cee
Merge branch 'master' of https://github.com/apache/spark into reduce-…
shivaram Mar 21, 2015
ad7cb53
Merge branch 'master' of https://github.com/apache/spark into reduce-…
shivaram Jun 4, 2015
e7d5449
Fix merge issues
shivaram Jun 4, 2015
0df3180
Address code review comments
shivaram Jun 4, 2015
8e31266
Fix style
shivaram Jun 4, 2015
9d5831a
Address some more comments
shivaram Jun 5, 2015
6cfae98
Filter out zero blocks, rename variables
shivaram Jun 5, 2015
e5d56bd
Add flag to turn off locality for shuffle deps
shivaram Jun 5, 2015
77ce7d8
Merge branch 'master' of https://github.com/apache/spark into reduce-…
shivaram Jun 6, 2015
1090b58
Change flag name
shivaram Jun 6, 2015
68bc29e
Fix line length
shivaram Jun 6, 2015
f5be578
Use fraction of map outputs to determine locations
shivaram Jun 9, 2015
897a914
Remove unused hash map
shivaram Jun 9, 2015
2ef2d39
Address code review comments
shivaram Jun 9, 2015
492e25e
Remove unused import
shivaram Jun 9, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io._
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import scala.collection.mutable.{HashSet, Map}
import scala.collection.mutable.{HashMap, HashSet, Map}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag

Expand Down Expand Up @@ -284,6 +284,52 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId)
}

/**
* Return a list of locations which have fraction of map output greater than specified threshold.
*
* @param shuffleId id of the shuffle
* @param reducerId id of the reduce task
* @param numReducers total number of reducers in the shuffle
* @param fractionThreshold fraction of total map output size that a location must have
* for it to be considered large.
*
* This method is not thread-safe
*/
def getLocationsWithLargestOutputs(
shuffleId: Int,
reducerId: Int,
numReducers: Int,
fractionThreshold: Double)
: Option[Array[BlockManagerId]] = {

if (mapStatuses.contains(shuffleId)) {
val statuses = mapStatuses(shuffleId)
if (statuses.nonEmpty) {
// HashMap to add up sizes of all blocks at the same location
val locs = new HashMap[BlockManagerId, Long]
var totalOutputSize = 0L
var mapIdx = 0
while (mapIdx < statuses.length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you save statuses(mapIdx) here and then re-use it in the 3 places below? I just find it a little harder to read with the continued reference into the array

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Done

val status = statuses(mapIdx)
val blockSize = status.getSizeForBlock(reducerId)
if (blockSize > 0) {
locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
totalOutputSize += blockSize
}
mapIdx = mapIdx + 1
}
val topLocs = locs.filter { case (loc, size) =>
size.toDouble / totalOutputSize >= fractionThreshold
}
// Return if we have any locations which satisfy the required threshold
if (topLocs.nonEmpty) {
return Some(topLocs.map(_._1).toArray)
}
}
}
None
}

def incrementEpoch() {
epochLock.synchronized {
epoch += 1
Expand Down
37 changes: 34 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,22 @@ class DAGScheduler(
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)

// Flag to control if reduce tasks are assigned preferred locations
private val shuffleLocalityEnabled =
sc.getConf.getBoolean("spark.shuffle.reduceLocality.enabled", true)
// Number of map, reduce tasks above which we do not assign preferred locations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment here saying that we limit the size because of scalability issues with sorting the best locations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// based on map output sizes. We limit the size of jobs for which assign preferred locations
// as computing the top locations by size becomes expensive.
private[this] val SHUFFLE_PREF_MAP_THRESHOLD = 1000
// NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
private[this] val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000

// Fraction of total map output that must be at a location for it to considered as a preferred
// location for a reduce task.
// Making this larger will focus on fewer locations where most data can be read locally, but
// may lead to more delay in scheduling if those locations are busy.
private[scheduler] val REDUCER_PREF_LOCS_FRACTION = 0.2

// Called by TaskScheduler to report task's starting.
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventProcessLoop.post(BeginEvent(task, taskInfo))
Expand Down Expand Up @@ -1384,17 +1400,32 @@ class DAGScheduler(
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
// that has any placement preferences. Ideally we would choose based on transfer sizes,
// but this will do for now.

rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
// that has any placement preferences. Ideally we would choose based on transfer sizes,
// but this will do for now.
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
case s: ShuffleDependency[_, _, _] =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a high-level comment here explaining what this is doing? Something like "For reduce tasks, return the 5 locations with the largest map outputs as preferred locations"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added -- Also I moved the comment for narrow dependencies close to that.

// For shuffle dependencies, pick locations which have at least REDUCER_PREF_LOCS_FRACTION
// of data as preferred locations
if (shuffleLocalityEnabled &&
rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
// Get the preferred map output locations for this reducer
val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION)
if (topLocsForReducer.nonEmpty) {
return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
}
}

case _ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can delete this pattern now, it shouldn't ever occur.

incidentally, this also suggests that Depedendency should be sealed. I just tried making that change, it turned up these other warnings, which actually seem legit:

warn] /Users/irashid/github/spark/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala:136: non-variable type argument Product2[K,Any] in type pattern org.apache.spark.OneToOneDependency[Product2[K,Any]] is unchecked since it is eliminated by erasure
[warn]       case oneToOneDependency: OneToOneDependency[Product2[K, Any]] =>
[warn]                                ^
[warn] /Users/irashid/github/spark/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala:135: match may not be exhaustive.
[warn] It would fail on the following input: NarrowDependency()
[warn]     for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
[warn]                                                      ^
[warn] /Users/irashid/github/spark/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala:109: match may not be exhaustive.
[warn] It would fail on the following input: NarrowDependency()
[warn]       dependencies(depNum) match {
[warn]                   ^
[warn] three warnings found

It seems that code will break if they are ever given a RangeDependency

}
Nil
Expand Down
35 changes: 35 additions & 0 deletions core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,39 @@ class MapOutputTrackerSuite extends SparkFunSuite {
// masterTracker.stop() // this throws an exception
rpcEnv.shutdown()
}

test("getLocationsWithLargestOutputs with multiple outputs in same machine") {
val rpcEnv = createRpcEnv("test")
val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf))
// Setup 3 map tasks
// on hostA with output size 2
// on hostA with output size 2
// on hostB with output size 3
tracker.registerShuffle(10, 3)
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(2L)))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(2L)))
tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(3L)))

// When the threshold is 50%, only host A should be returned as a preferred location
// as it has 4 out of 7 bytes of output.
val topLocs50 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comments to this test -- just one here saying "When the threshold is 50%, only host A should be returned a preferred location" and then below, "When the threshold is only 20%, both hosts should be returned"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

assert(topLocs50.nonEmpty)
assert(topLocs50.get.size === 1)
assert(topLocs50.get.head === BlockManagerId("a", "hostA", 1000))

// When the threshold is 20%, both hosts should be returned as preferred locations.
val topLocs20 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.2)
assert(topLocs20.nonEmpty)
assert(topLocs20.get.size === 2)
assert(topLocs20.get.toSet ===
Seq(BlockManagerId("a", "hostA", 1000), BlockManagerId("b", "hostB", 1000)).toSet)

tracker.stop()
rpcEnv.shutdown()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,8 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
(Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
// the 2nd ResultTask failed
complete(taskSets(1), Seq(
(Success, 42),
Expand All @@ -501,7 +501,7 @@ class DAGSchedulerSuite
// ask the scheduler to try it again
scheduler.resubmitFailedStages()
// have the 2nd attempt pass
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size))))
// we can see both result blocks now
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
Array("hostA", "hostB"))
Expand All @@ -517,8 +517,8 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
(Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
// The MapOutputTracker should know about both map output locations.
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
Array("hostA", "hostB"))
Expand Down Expand Up @@ -560,18 +560,18 @@ class DAGSchedulerSuite
assert(newEpoch > oldEpoch)
val taskSet = taskSets(0)
// should be ignored for being too old
runEvent(CompletionEvent(
taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
// should work because it's a non-failed host
runEvent(CompletionEvent(
taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo(), null))
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB",
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
// should be ignored for being too old
runEvent(CompletionEvent(
taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
// should work because it's a new epoch
taskSet.tasks(1).epoch = newEpoch
runEvent(CompletionEvent(
taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA",
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
Expand Down Expand Up @@ -800,19 +800,63 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}

test("reduce tasks should be placed locally with map output") {
// Create an shuffleMapRdd with 1 partition
val shuffleMapRdd = new MyRDD(sc, 1, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1))))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostA")))

// Reducer should run on the same host that map task ran
val reduceTaskSet = taskSets(1)
assertLocations(reduceTaskSet, Seq(Seq("hostA")))
complete(reduceTaskSet, Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty
}

test("reduce task locality preferences should only include machines with largest map outputs") {
val numMapTasks = 4
// Create an shuffleMapRdd with more partitions
val shuffleMapRdd = new MyRDD(sc, numMapTasks, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))

val statuses = (1 to numMapTasks).map { i =>
(Success, makeMapStatus("host" + i, 1, (10*i).toByte))
}
complete(taskSets(0), statuses)

// Reducer should prefer the last 3 hosts as they have 20%, 30% and 40% of data
val hosts = (1 to numMapTasks).map(i => "host" + i).reverse.take(numMapTasks - 1)

val reduceTaskSet = taskSets(1)
assertLocations(reduceTaskSet, Seq(hosts))
complete(reduceTaskSet, Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty
}

/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
*/
private def assertLocations(taskSet: TaskSet, hosts: Seq[Seq[String]]) {
assert(hosts.size === taskSet.tasks.size)
for ((taskLocs, expectedLocs) <- taskSet.tasks.map(_.preferredLocations).zip(hosts)) {
assert(taskLocs.map(_.host) === expectedLocs)
assert(taskLocs.map(_.host).toSet === expectedLocs.toSet)
}
}

private def makeMapStatus(host: String, reduces: Int): MapStatus =
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(2))
private def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus =
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes))

private def makeBlockManagerId(host: String): BlockManagerId =
BlockManagerId("exec-" + host, host, 12345)
Expand Down