Skip to content

Commit f5be578

Browse files
committed
Use fraction of map outputs to determine locations
Also removes caching of preferred locations to make the API cleaner
1 parent 68bc29e commit f5be578

File tree

4 files changed

+51
-34
lines changed

4 files changed

+51
-34
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -292,44 +292,51 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
292292
}
293293

294294
/**
295-
* Return a list of locations which have the largest map outputs given a shuffleId
296-
* and a reducerId.
295+
* Return a list of locations which have fraction of map output greater than specified threshold.
296+
*
297+
* @param shuffleId id of the shuffle
298+
* @param reducerId id of the reduce task
299+
* @param numReducers total number of reducers in the shuffle
300+
* @param fractionThreshold fraction of total map output size that a location must have
301+
* for it to be considered large.
297302
*
298303
* This method is not thread-safe
299304
*/
300305
def getLocationsWithLargestOutputs(
301306
shuffleId: Int,
302307
reducerId: Int,
303308
numReducers: Int,
304-
numTopLocs: Int)
309+
fractionThreshold: Double)
305310
: Option[Array[BlockManagerId]] = {
306-
if (!shuffleIdToReduceLocations.contains(shuffleId) && mapStatuses.contains(shuffleId)) {
311+
312+
if (mapStatuses.contains(shuffleId)) {
307313
// Pre-compute the top locations for each reducer and cache it
308314
val statuses = mapStatuses(shuffleId)
309315
if (statuses.nonEmpty) {
310-
val ordering = Ordering.by[(BlockManagerId, Long), Long](_._2).reverse
311-
shuffleIdToReduceLocations(shuffleId) = new HashMap[Int, Array[BlockManagerId]]
312-
var reduceIdx = 0
313316
// HashMap to add up sizes of all blocks at the same location
314317
val locs = new HashMap[BlockManagerId, Long]
315-
while (reduceIdx < numReducers) {
316-
var mapIdx = 0
317-
locs.clear()
318-
while (mapIdx < statuses.length) {
319-
val blockSize = statuses(mapIdx).getSizeForBlock(reduceIdx)
320-
if (blockSize > 0) {
321-
locs(statuses(mapIdx).location) = locs.getOrElse(statuses(mapIdx).location, 0L) +
322-
blockSize
323-
}
324-
mapIdx = mapIdx + 1
318+
var totalOutputSize = 0L
319+
var mapIdx = 0
320+
while (mapIdx < statuses.length) {
321+
val blockSize = statuses(mapIdx).getSizeForBlock(reducerId)
322+
if (blockSize > 0) {
323+
locs(statuses(mapIdx).location) = locs.getOrElse(statuses(mapIdx).location, 0L) +
324+
blockSize
325+
totalOutputSize += blockSize
325326
}
326-
val topLocs = CollectionUtils.takeOrdered(locs.toIterator, numTopLocs)(ordering)
327-
shuffleIdToReduceLocations(shuffleId) += (reduceIdx -> topLocs.map(_._1).toArray)
328-
reduceIdx = reduceIdx + 1
327+
mapIdx = mapIdx + 1
328+
}
329+
val topLocs = locs.filter { case (loc, size) =>
330+
size.toDouble / totalOutputSize >= fractionThreshold
331+
}
332+
// Only add this reducer to our map if we have any locations which satisfy
333+
// the required threshold
334+
if (topLocs.nonEmpty) {
335+
return Some(topLocs.map(_._1).toArray)
329336
}
330337
}
331338
}
332-
shuffleIdToReduceLocations.get(shuffleId).flatMap(_.get(reducerId))
339+
None
333340
}
334341

335342
def incrementEpoch() {

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,13 @@ class DAGScheduler(
146146
private[this] val SHUFFLE_PREF_MAP_THRESHOLD = 1000
147147
// NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
148148
private[this] val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000
149-
// Number of preferred locations to use for reducer tasks.
150-
// Making this smaller will focus on the locations where the most data can be read locally, but
151-
// may lead to more delay in scheduling if all of those locations are busy.
152-
private[scheduler] val NUM_REDUCER_PREF_LOCS = 5
149+
150+
// Fraction of total map output that must be at a location for it to considered as a preferred
151+
// location for a reduce task.
152+
// Making this larger will focus on fewer locations where most data can be read locally, but
153+
// may lead to more delay in scheduling if those locations are busy.
154+
//
155+
private[scheduler] val REDUCER_PREF_LOCS_FRACTION = 0.2
153156

154157
// Called by TaskScheduler to report task's starting.
155158
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
@@ -1411,14 +1414,14 @@ class DAGScheduler(
14111414
}
14121415
}
14131416
case s: ShuffleDependency[_, _, _] =>
1414-
// For shuffle dependencies, pick the 5 locations with the largest map outputs as preferred
1415-
// locations
1417+
// For shuffle dependencies, pick locations which have at least REDUCER_PREF_LOCS_FRACTION
1418+
// of data as preferred locations
14161419
if (shuffleLocalityEnabled &&
14171420
rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
14181421
s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
14191422
// Get the preferred map output locations for this reducer
14201423
val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
1421-
partition, rdd.partitions.size, NUM_REDUCER_PREF_LOCS)
1424+
partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION)
14221425
if (topLocsForReducer.nonEmpty) {
14231426
return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
14241427
}

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,17 @@ class MapOutputTrackerSuite extends SparkFunSuite {
223223
tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000),
224224
Array(3L)))
225225

226-
val topLocs = tracker.getLocationsWithLargestOutputs(10, 0, 1, 1)
227-
assert(topLocs.nonEmpty)
228-
assert(topLocs.get.size === 1)
229-
assert(topLocs.get.head === BlockManagerId("a", "hostA", 1000))
226+
val topLocs50 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.5)
227+
assert(topLocs50.nonEmpty)
228+
assert(topLocs50.get.size === 1)
229+
assert(topLocs50.get.head === BlockManagerId("a", "hostA", 1000))
230+
231+
val topLocs20 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.2)
232+
assert(topLocs20.nonEmpty)
233+
assert(topLocs20.get.size === 2)
234+
assert(topLocs20.get.toSet ===
235+
Seq(BlockManagerId("a", "hostA", 1000), BlockManagerId("b", "hostB", 1000)).toSet)
236+
230237
tracker.stop()
231238
rpcEnv.shutdown()
232239
}

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -821,7 +821,7 @@ class DAGSchedulerSuite
821821
}
822822

823823
test("reducer locality with different sizes") {
824-
val numMapTasks = scheduler.NUM_REDUCER_PREF_LOCS + 1
824+
val numMapTasks = 4
825825
// Create an shuffleMapRdd with more partitions
826826
val shuffleMapRdd = new MyRDD(sc, numMapTasks, Nil)
827827
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
@@ -834,7 +834,7 @@ class DAGSchedulerSuite
834834
}
835835
complete(taskSets(0), statuses)
836836

837-
// Reducer should prefer the last hosts where output size is larger
837+
// Reducer should prefer the last 3 hosts as they have 20%, 30% and 40% of data
838838
val hosts = (1 to numMapTasks).map(i => "host" + i).reverse.take(numMapTasks - 1)
839839

840840
val reduceTaskSet = taskSets(1)

0 commit comments

Comments
 (0)