diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index ec8621bc55cf3..18cd5de4cfada 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -322,36 +322,22 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging // For testing def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1) + getMapSizesByExecutorId(shuffleId, 0, Int.MaxValue, reduceId, reduceId + 1) } /** * Called from executors to get the server URIs and output sizes for each shuffle block that * needs to be read from a given range of map output partitions (startPartition is included but - * endPartition is excluded from the range). + * endPartition is excluded from the range) within a range of mappers (startMapIndex is included + * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be + * changed to the length of total map outputs. * * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, * and the second item is a sequence of (shuffle block id, shuffle block size, map index) * tuples describing the shuffle blocks that are stored at that block manager. + * Note that zero-sized blocks are excluded in the result. */ def getMapSizesByExecutorId( - shuffleId: Int, - startPartition: Int, - endPartition: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] - - /** - * Called from executors to get the server URIs and output sizes for each shuffle block that - * needs to be read from a given range of map output partitions (startPartition is included but - * endPartition is excluded from the range) and is produced by - * a range of mappers (startMapIndex, endMapIndex, startMapIndex is included and - * the endMapIndex is excluded). - * - * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, - * and the second item is a sequence of (shuffle block id, shuffle block size, map index) - * tuples describing the shuffle blocks that are stored at that block manager. - */ - def getMapSizesByRange( shuffleId: Int, startMapIndex: Int, endMapIndex: Int, @@ -734,38 +720,22 @@ private[spark] class MapOutputTrackerMaster( } } - // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. // This method is only called in local-mode. def getMapSizesByExecutorId( - shuffleId: Int, - startPartition: Int, - endPartition: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") - shuffleStatuses.get(shuffleId) match { - case Some (shuffleStatus) => - shuffleStatus.withMapStatuses { statuses => - MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses, 0, shuffleStatus.mapStatuses.length) - } - case None => - Iterator.empty - } - } - - override def getMapSizesByRange( shuffleId: Int, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, mappers $startMapIndex-$endMapIndex" + - s"partitions $startPartition-$endPartition") + logDebug(s"Fetching outputs for shuffle $shuffleId") shuffleStatuses.get(shuffleId) match { case Some(shuffleStatus) => shuffleStatus.withMapStatuses { statuses => + val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex + logDebug(s"Convert map statuses for shuffle $shuffleId, " + + s"mappers $startMapIndex-$actualEndMapIndex, partitions $startPartition-$endPartition") MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex) + shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex) } case None => Iterator.empty @@ -798,37 +768,20 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr */ private val fetchingLock = new KeyLock[Int] - // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. override def getMapSizesByExecutorId( - shuffleId: Int, - startPartition: Int, - endPartition: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") - val statuses = getStatuses(shuffleId, conf) - try { - MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses, 0, statuses.length) - } catch { - case e: MetadataFetchFailedException => - // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: - mapStatuses.clear() - throw e - } - } - - override def getMapSizesByRange( shuffleId: Int, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, mappers $startMapIndex-$endMapIndex" + - s"partitions $startPartition-$endPartition") + logDebug(s"Fetching outputs for shuffle $shuffleId") val statuses = getStatuses(shuffleId, conf) try { + val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex + logDebug(s"Convert map statuses for shuffle $shuffleId, " + + s"mappers $startMapIndex-$actualEndMapIndex, partitions $startPartition-$endPartition") MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex) + shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex) } catch { case e: MetadataFetchFailedException => // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala index 057b0d6e0b0a7..400c4526f0114 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala @@ -43,23 +43,31 @@ private[spark] trait ShuffleManager { context: TaskContext, metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] + /** - * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to + * read from all map outputs of the shuffle. + * * Called on executors by reduce tasks. */ - def getReader[K, C]( + final def getReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext, - metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] + metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { + getReader(handle, 0, Int.MaxValue, startPartition, endPartition, context, metrics) + } /** * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to - * read from map output (startMapIndex to endMapIndex - 1, inclusive). + * read from a range of map outputs(startMapIndex to endMapIndex-1, inclusive). + * If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length of total map + * outputs of the shuffle in `getMapSizesByExecutorId`. + * * Called on executors by reduce tasks. */ - def getReaderForRange[K, C]( + def getReader[K, C]( handle: ShuffleHandle, startMapIndex: Int, endMapIndex: Int, diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index aefcb59b8bb87..72460180f5908 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark._ import org.apache.spark.internal.{config, Logging} +import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle._ import org.apache.spark.shuffle.api.{ShuffleDataIO, ShuffleExecutorComponents} import org.apache.spark.util.Utils @@ -115,23 +116,14 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager } /** - * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to + * read from a range of map outputs(startMapIndex to endMapIndex-1, inclusive). + * If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length of total map + * outputs of the shuffle in `getMapSizesByExecutorId`. + * * Called on executors by reduce tasks. */ override def getReader[K, C]( - handle: ShuffleHandle, - startPartition: Int, - endPartition: Int, - context: TaskContext, - metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { - val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( - handle.shuffleId, startPartition, endPartition) - new BlockStoreShuffleReader( - handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics, - shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context)) - } - - override def getReaderForRange[K, C]( handle: ShuffleHandle, startMapIndex: Int, endMapIndex: Int, @@ -139,7 +131,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { - val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByRange( + val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics, diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index d5ee19bde8edf..630ffd9baa06e 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -317,7 +317,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), Array(size10000, size0, size1000, size0), 6)) assert(tracker.containsShuffle(10)) - assert(tracker.getMapSizesByExecutorId(10, 0, 4).toSeq === + assert(tracker.getMapSizesByExecutorId(10, 0, 2, 0, 4).toSeq === Seq( (BlockManagerId("a", "hostA", 1000), Seq((ShuffleBlockId(10, 5, 1), size1000, 0), diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala index a82f86a11c77e..d964b28df2983 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala @@ -104,7 +104,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext // shuffle data to read. val mapOutputTracker = mock(classOf[MapOutputTracker]) when(mapOutputTracker.getMapSizesByExecutorId( - shuffleId, reduceId, reduceId + 1)).thenReturn { + shuffleId, 0, numMaps, reduceId, reduceId + 1)).thenReturn { // Test a scenario where all data is local, to avoid creating a bunch of additional mocks // for the code to read data over the network. val shuffleBlockIdsAndSizes = (0 until numMaps).map { mapId => @@ -132,7 +132,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext val taskContext = TaskContext.empty() val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics() val blocksByAddress = mapOutputTracker.getMapSizesByExecutorId( - shuffleId, reduceId, reduceId + 1) + shuffleId, 0, numMaps, reduceId, reduceId + 1) val shuffleReader = new BlockStoreShuffleReader( shuffleHandle, blocksByAddress, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index 5936492dd819c..b5e9655a776b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -191,7 +191,7 @@ class ShuffledRowRDD( sqlMetricsReporter) case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex, _) => - SparkEnv.get.shuffleManager.getReaderForRange( + SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, startMapIndex, endMapIndex, @@ -201,7 +201,7 @@ class ShuffledRowRDD( sqlMetricsReporter) case PartialMapperPartitionSpec(mapIndex, startReducerIndex, endReducerIndex) => - SparkEnv.get.shuffleManager.getReaderForRange( + SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, mapIndex, mapIndex + 1,