Skip to content
77 changes: 15 additions & 62 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -322,36 +322,22 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
// For testing
def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1)
getMapSizesByExecutorId(shuffleId, 0, Int.MaxValue, reduceId, reduceId + 1)
}

/**
* Called from executors to get the server URIs and output sizes for each shuffle block that
* needs to be read from a given range of map output partitions (startPartition is included but
* endPartition is excluded from the range).
* endPartition is excluded from the range) within a range of mappers (startMapIndex is included
* but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be
* changed to the length of total map outputs.
*
* @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
* and the second item is a sequence of (shuffle block id, shuffle block size, map index)
* tuples describing the shuffle blocks that are stored at that block manager.
* Note that zero-sized blocks are excluded in the result.
*/
def getMapSizesByExecutorId(
shuffleId: Int,
startPartition: Int,
endPartition: Int)
: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]

/**
* Called from executors to get the server URIs and output sizes for each shuffle block that
* needs to be read from a given range of map output partitions (startPartition is included but
* endPartition is excluded from the range) and is produced by
* a range of mappers (startMapIndex, endMapIndex, startMapIndex is included and
* the endMapIndex is excluded).
*
* @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
* and the second item is a sequence of (shuffle block id, shuffle block size, map index)
* tuples describing the shuffle blocks that are stored at that block manager.
*/
def getMapSizesByRange(
shuffleId: Int,
startMapIndex: Int,
endMapIndex: Int,
Expand Down Expand Up @@ -734,38 +720,22 @@ private[spark] class MapOutputTrackerMaster(
}
}

// Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result.
// This method is only called in local-mode.
def getMapSizesByExecutorId(
shuffleId: Int,
startPartition: Int,
endPartition: Int)
: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
shuffleStatuses.get(shuffleId) match {
case Some (shuffleStatus) =>
shuffleStatus.withMapStatuses { statuses =>
MapOutputTracker.convertMapStatuses(
shuffleId, startPartition, endPartition, statuses, 0, shuffleStatus.mapStatuses.length)
}
case None =>
Iterator.empty
}
}

override def getMapSizesByRange(
shuffleId: Int,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, mappers $startMapIndex-$endMapIndex" +
s"partitions $startPartition-$endPartition")
logDebug(s"Fetching outputs for shuffle $shuffleId")
shuffleStatuses.get(shuffleId) match {
case Some(shuffleStatus) =>
shuffleStatus.withMapStatuses { statuses =>
val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex
logDebug(s"Convert map statuses for shuffle $shuffleId, " +
s"mappers $startMapIndex-$actualEndMapIndex, partitions $startPartition-$endPartition")
MapOutputTracker.convertMapStatuses(
shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex)
shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex)
}
case None =>
Iterator.empty
Expand Down Expand Up @@ -798,37 +768,20 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
*/
private val fetchingLock = new KeyLock[Int]

// Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result.
override def getMapSizesByExecutorId(
shuffleId: Int,
startPartition: Int,
endPartition: Int)
: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
val statuses = getStatuses(shuffleId, conf)
try {
MapOutputTracker.convertMapStatuses(
shuffleId, startPartition, endPartition, statuses, 0, statuses.length)
} catch {
case e: MetadataFetchFailedException =>
// We experienced a fetch failure so our mapStatuses cache is outdated; clear it:
mapStatuses.clear()
throw e
}
}

override def getMapSizesByRange(
shuffleId: Int,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, mappers $startMapIndex-$endMapIndex" +
s"partitions $startPartition-$endPartition")
logDebug(s"Fetching outputs for shuffle $shuffleId")
val statuses = getStatuses(shuffleId, conf)
try {
val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex
logDebug(s"Convert map statuses for shuffle $shuffleId, " +
s"mappers $startMapIndex-$actualEndMapIndex, partitions $startPartition-$endPartition")
MapOutputTracker.convertMapStatuses(
shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex)
shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex)
} catch {
case e: MetadataFetchFailedException =>
// We experienced a fetch failure so our mapStatuses cache is outdated; clear it:
Expand Down
18 changes: 13 additions & 5 deletions core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,31 @@ private[spark] trait ShuffleManager {
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V]


/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to
* read from all map outputs of the shuffle.
*
* Called on executors by reduce tasks.
*/
def getReader[K, C](
final def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
getReader(handle, 0, Int.MaxValue, startPartition, endPartition, context, metrics)
}

/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to
* read from map output (startMapIndex to endMapIndex - 1, inclusive).
* read from a range of map outputs(startMapIndex to endMapIndex-1, inclusive).
* If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length of total map
* outputs of the shuffle in `getMapSizesByExecutorId`.
*
* Called on executors by reduce tasks.
*/
def getReaderForRange[K, C](
def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._

import org.apache.spark._
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle._
import org.apache.spark.shuffle.api.{ShuffleDataIO, ShuffleExecutorComponents}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -115,31 +116,22 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
}

/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to
* read from a range of map outputs(startMapIndex to endMapIndex-1, inclusive).
* If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length of total map
* outputs of the shuffle in `getMapSizesByExecutorId`.
*
* Called on executors by reduce tasks.
*/
override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startPartition, endPartition)
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
}

override def getReaderForRange[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByRange(
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(size10000, size0, size1000, size0), 6))
assert(tracker.containsShuffle(10))
assert(tracker.getMapSizesByExecutorId(10, 0, 4).toSeq ===
assert(tracker.getMapSizesByExecutorId(10, 0, 2, 0, 4).toSeq ===
Seq(
(BlockManagerId("a", "hostA", 1000),
Seq((ShuffleBlockId(10, 5, 1), size1000, 0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext
// shuffle data to read.
val mapOutputTracker = mock(classOf[MapOutputTracker])
when(mapOutputTracker.getMapSizesByExecutorId(
shuffleId, reduceId, reduceId + 1)).thenReturn {
shuffleId, 0, numMaps, reduceId, reduceId + 1)).thenReturn {
// Test a scenario where all data is local, to avoid creating a bunch of additional mocks
// for the code to read data over the network.
val shuffleBlockIdsAndSizes = (0 until numMaps).map { mapId =>
Expand Down Expand Up @@ -132,7 +132,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext
val taskContext = TaskContext.empty()
val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics()
val blocksByAddress = mapOutputTracker.getMapSizesByExecutorId(
shuffleId, reduceId, reduceId + 1)
shuffleId, 0, numMaps, reduceId, reduceId + 1)
val shuffleReader = new BlockStoreShuffleReader(
shuffleHandle,
blocksByAddress,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ class ShuffledRowRDD(
sqlMetricsReporter)

case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex, _) =>
SparkEnv.get.shuffleManager.getReaderForRange(
SparkEnv.get.shuffleManager.getReader(
dependency.shuffleHandle,
startMapIndex,
endMapIndex,
Expand All @@ -201,7 +201,7 @@ class ShuffledRowRDD(
sqlMetricsReporter)

case PartialMapperPartitionSpec(mapIndex, startReducerIndex, endReducerIndex) =>
SparkEnv.get.shuffleManager.getReaderForRange(
SparkEnv.get.shuffleManager.getReader(
dependency.shuffleHandle,
mapIndex,
mapIndex + 1,
Expand Down