Skip to content

Commit 2ef2d39

Browse files
committed
Address code review comments
1 parent 897a914 commit 2ef2d39

File tree

4 files changed

+10
-10
lines changed

4 files changed

+10
-10
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -304,27 +304,25 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
304304
: Option[Array[BlockManagerId]] = {
305305

306306
if (mapStatuses.contains(shuffleId)) {
307-
// Pre-compute the top locations for each reducer and cache it
308307
val statuses = mapStatuses(shuffleId)
309308
if (statuses.nonEmpty) {
310309
// HashMap to add up sizes of all blocks at the same location
311310
val locs = new HashMap[BlockManagerId, Long]
312311
var totalOutputSize = 0L
313312
var mapIdx = 0
314313
while (mapIdx < statuses.length) {
315-
val blockSize = statuses(mapIdx).getSizeForBlock(reducerId)
314+
val status = statuses(mapIdx)
315+
val blockSize = status.getSizeForBlock(reducerId)
316316
if (blockSize > 0) {
317-
locs(statuses(mapIdx).location) = locs.getOrElse(statuses(mapIdx).location, 0L) +
318-
blockSize
317+
locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
319318
totalOutputSize += blockSize
320319
}
321320
mapIdx = mapIdx + 1
322321
}
323322
val topLocs = locs.filter { case (loc, size) =>
324323
size.toDouble / totalOutputSize >= fractionThreshold
325324
}
326-
// Only add this reducer to our map if we have any locations which satisfy
327-
// the required threshold
325+
// Return if we have any locations which satisfy the required threshold
328326
if (topLocs.nonEmpty) {
329327
return Some(topLocs.map(_._1).toArray)
330328
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ class DAGScheduler(
142142
sc.getConf.getBoolean("spark.shuffle.reduceLocality.enabled", true)
143143
// Number of map, reduce tasks above which we do not assign preferred locations
144144
// based on map output sizes. We limit the size of jobs for which assign preferred locations
145-
// as sorting the locations by size becomes expensive.
145+
// as computing the top locations by size becomes expensive.
146146
private[this] val SHUFFLE_PREF_MAP_THRESHOLD = 1000
147147
// NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
148148
private[this] val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000
@@ -151,7 +151,6 @@ class DAGScheduler(
151151
// location for a reduce task.
152152
// Making this larger will focus on fewer locations where most data can be read locally, but
153153
// may lead to more delay in scheduling if those locations are busy.
154-
//
155154
private[scheduler] val REDUCER_PREF_LOCS_FRACTION = 0.2
156155

157156
// Called by TaskScheduler to report task's starting.

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,11 +223,14 @@ class MapOutputTrackerSuite extends SparkFunSuite {
223223
tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000),
224224
Array(3L)))
225225

226+
// When the threshold is 50%, only host A should be returned as a preferred location
227+
// as it has 4 out of 7 bytes of output.
226228
val topLocs50 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.5)
227229
assert(topLocs50.nonEmpty)
228230
assert(topLocs50.get.size === 1)
229231
assert(topLocs50.get.head === BlockManagerId("a", "hostA", 1000))
230232

233+
// When the threshold is 20%, both hosts should be returned as preferred locations.
231234
val topLocs20 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.2)
232235
assert(topLocs20.nonEmpty)
233236
assert(topLocs20.get.size === 2)

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -800,7 +800,7 @@ class DAGSchedulerSuite
800800
assertDataStructuresEmpty()
801801
}
802802

803-
test("shuffle with reducer locality") {
803+
test("reduce tasks should be placed locally with map output") {
804804
// Create an shuffleMapRdd with 1 partition
805805
val shuffleMapRdd = new MyRDD(sc, 1, Nil)
806806
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
@@ -820,7 +820,7 @@ class DAGSchedulerSuite
820820
assertDataStructuresEmpty
821821
}
822822

823-
test("reducer locality with different sizes") {
823+
test("reduce task locality preferences should only include machines with largest map outputs") {
824824
val numMapTasks = 4
825825
// Create an shuffleMapRdd with more partitions
826826
val shuffleMapRdd = new MyRDD(sc, numMapTasks, Nil)

0 commit comments

Comments
 (0)