Skip to content

Commit 9d5831a

Browse files
committed
Address some more comments
1 parent 8e31266 commit 9d5831a

File tree

3 files changed

+19
-14
lines changed

3 files changed

+19
-14
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -310,14 +310,17 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
310310
val ordering = Ordering.by[(BlockManagerId, Long), Long](_._2).reverse
311311
shuffleIdToReduceLocations(shuffleId) = new HashMap[Int, Array[BlockManagerId]]
312312
var r = 0
313+
// HashMap to add up sizes of all blocks at the same location
314+
val locs = new HashMap[BlockManagerId, Long]
313315
while (r < numReducers) {
314-
// Add up sizes of all blocks at the same location
315-
val locs = statuses.map { s =>
316-
(s.location, s.getSizeForBlock(r))
317-
}.groupBy(_._1).mapValues { sizes =>
318-
sizes.map(_._2).reduceLeft(_ + _)
319-
}.toIterator
320-
val topLocs = CollectionUtils.takeOrdered(locs, numTopLocs)(ordering)
316+
var i = 0
317+
locs.clear()
318+
while (i < statuses.length) {
319+
locs(statuses(i).location) = locs.getOrElse(statuses(i).location, 0L) +
320+
statuses(i).getSizeForBlock(r)
321+
i = i + 1
322+
}
323+
val topLocs = CollectionUtils.takeOrdered(locs.toIterator, numTopLocs)(ordering)
321324
shuffleIdToReduceLocations(shuffleId) += (r -> topLocs.map(_._1).toArray)
322325
r = r + 1
323326
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,9 @@ class DAGScheduler(
143143
private[this] val SHUFFLE_PREF_MAP_THRESHOLD = 1000
144144
// NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
145145
private[this] val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000
146-
// Number of preferred locations to use for reducer tasks
146+
// Number of preferred locations to use for reducer tasks.
147+
// Making this smaller will focus on the locations where the most data can be read locally, but
148+
// may lead to more delay in scheduling if all of those locations are busy.
147149
private[scheduler] val NUM_REDUCER_PREF_LOCS = 5
148150

149151
// Called by TaskScheduler to report task's starting.

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -212,16 +212,16 @@ class MapOutputTrackerSuite extends SparkFunSuite {
212212
tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
213213
new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf))
214214
// Setup 3 map tasks
215-
// on hostA with output size 1
216-
// on hostA with output size 1
217-
// on hostB with output size 2
215+
// on hostA with output size 2
216+
// on hostA with output size 2
217+
// on hostB with output size 3
218218
tracker.registerShuffle(10, 3)
219219
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
220-
Array(1L)))
220+
Array(2L)))
221221
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("a", "hostA", 1000),
222-
Array(1L)))
223-
tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000),
224222
Array(2L)))
223+
tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000),
224+
Array(3L)))
225225

226226
val topLocs = tracker.getLocationsWithLargestOutputs(10, 0, 1, 1)
227227
assert(topLocs.nonEmpty)

0 commit comments

Comments
 (0)