From 08a8f74f30b344251dc003bb9a57c7d248ae066c Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 00:41:00 +0800 Subject: [PATCH 01/15] unify getreader --- .../org/apache/spark/MapOutputTracker.scala | 67 ++----------------- .../org/apache/spark/rdd/CoGroupedRDD.scala | 4 +- .../org/apache/spark/rdd/ShuffledRDD.scala | 3 +- .../org/apache/spark/rdd/SubtractedRDD.scala | 1 + .../apache/spark/shuffle/ShuffleManager.scala | 19 ++---- .../shuffle/sort/SortShuffleManager.scala | 19 +----- .../apache/spark/MapOutputTrackerSuite.scala | 2 +- .../scala/org/apache/spark/ShuffleSuite.scala | 3 +- .../spark/scheduler/CustomShuffledRDD.scala | 3 +- .../BlockStoreShuffleReaderSuite.scala | 4 +- .../spark/sql/execution/ShuffledRowRDD.scala | 11 ++- 11 files changed, 32 insertions(+), 104 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index ec8621bc55cf3..825ada1b06702 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -322,7 +322,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging // For testing def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1) + getMapSizesByExecutorId(shuffleId, mapStatus => (0, mapStatus.length), reduceId, reduceId + 1) } /** @@ -336,28 +336,11 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging */ def getMapSizesByExecutorId( shuffleId: Int, + mapIndexRange: Array[MapStatus] => (Int, Int), startPartition: Int, endPartition: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] - /** - * Called from executors to get the server URIs and output sizes for each shuffle block that - * needs to be read from a given range of map output partitions (startPartition is included but - * endPartition is excluded from the range) and is produced by - * a range of mappers (startMapIndex, endMapIndex, startMapIndex is included and - * the endMapIndex is excluded). - * - * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, - * and the second item is a sequence of (shuffle block id, shuffle block size, map index) - * tuples describing the shuffle blocks that are stored at that block manager. - */ - def getMapSizesByRange( - shuffleId: Int, - startMapIndex: Int, - endMapIndex: Int, - startPartition: Int, - endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] - /** * Deletes map output status information for the specified shuffle stage. */ @@ -738,32 +721,15 @@ private[spark] class MapOutputTrackerMaster( // This method is only called in local-mode. def getMapSizesByExecutorId( shuffleId: Int, + mapIndexRange: Array[MapStatus] => (Int, Int), startPartition: Int, endPartition: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { + : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") shuffleStatuses.get(shuffleId) match { case Some (shuffleStatus) => shuffleStatus.withMapStatuses { statuses => - MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses, 0, shuffleStatus.mapStatuses.length) - } - case None => - Iterator.empty - } - } - - override def getMapSizesByRange( - shuffleId: Int, - startMapIndex: Int, - endMapIndex: Int, - startPartition: Int, - endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, mappers $startMapIndex-$endMapIndex" + - s"partitions $startPartition-$endPartition") - shuffleStatuses.get(shuffleId) match { - case Some(shuffleStatus) => - shuffleStatus.withMapStatuses { statuses => + val (startMapIndex, endMapIndex) = mapIndexRange(statuses) MapOutputTracker.convertMapStatuses( shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex) } @@ -801,32 +767,14 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. override def getMapSizesByExecutorId( shuffleId: Int, + mapIndexRange: Array[MapStatus] => (Int, Int), startPartition: Int, endPartition: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") val statuses = getStatuses(shuffleId, conf) try { - MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses, 0, statuses.length) - } catch { - case e: MetadataFetchFailedException => - // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: - mapStatuses.clear() - throw e - } - } - - override def getMapSizesByRange( - shuffleId: Int, - startMapIndex: Int, - endMapIndex: Int, - startPartition: Int, - endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, mappers $startMapIndex-$endMapIndex" + - s"partitions $startPartition-$endPartition") - val statuses = getStatuses(shuffleId, conf) - try { + val (startMapIndex, endMapIndex) = mapIndexRange(statuses) MapOutputTracker.convertMapStatuses( shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex) } catch { @@ -988,7 +936,6 @@ private[spark] object MapOutputTracker extends Logging { * @param endPartition End of map output partition ID range (excluded from range) * @param statuses List of map statuses, indexed by map partition index. * @param startMapIndex Start Map index. - * @param endMapIndex End Map index. * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, * and the second item is a sequence of (shuffle block id, shuffle block size, map index) * tuples describing the shuffle blocks that are stored at that block manager. diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 500d306f336ac..d1cf79b20aab5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -145,7 +145,9 @@ class CoGroupedRDD[K: ClassTag]( val metrics = context.taskMetrics().createTempShuffleReadMetrics() val it = SparkEnv.get.shuffleManager .getReader( - shuffleDependency.shuffleHandle, split.index, split.index + 1, context, metrics) + shuffleDependency.shuffleHandle, + mapStatus => (0, mapStatus.length), + split.index, split.index + 1, context, metrics) .read() rddIterators += ((it, depNum)) } diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala index 0930a5c9cfb96..edd049a50458f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala @@ -103,7 +103,8 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] val metrics = context.taskMetrics().createTempShuffleReadMetrics() SparkEnv.get.shuffleManager.getReader( - dep.shuffleHandle, split.index, split.index + 1, context, metrics) + dep.shuffleHandle, mapStatus => (0, mapStatus.length), + split.index, split.index + 1, context, metrics) .read() .asInstanceOf[Iterator[(K, C)]] } diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala index d5a811d4dc3fd..3628a25d88c14 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala @@ -111,6 +111,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag]( val iter = SparkEnv.get.shuffleManager .getReader( shuffleDependency.shuffleHandle, + mapStatus => (0, mapStatus.length), partition.index, partition.index + 1, context, diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala index 057b0d6e0b0a7..7443c120e4edc 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala @@ -18,6 +18,7 @@ package org.apache.spark.shuffle import org.apache.spark.{ShuffleDependency, TaskContext} +import org.apache.spark.scheduler.MapStatus /** * Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the driver @@ -43,26 +44,14 @@ private[spark] trait ShuffleManager { context: TaskContext, metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] - /** - * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). - * Called on executors by reduce tasks. - */ - def getReader[K, C]( - handle: ShuffleHandle, - startPartition: Int, - endPartition: Int, - context: TaskContext, - metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] - /** * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to - * read from map output (startMapIndex to endMapIndex - 1, inclusive). + * read from map output which specified by mapIndexRange. * Called on executors by reduce tasks. */ - def getReaderForRange[K, C]( + def getReader[K, C]( handle: ShuffleHandle, - startMapIndex: Int, - endMapIndex: Int, + mapIndexRange: Array[MapStatus] => (Int, Int), startPartition: Int, endPartition: Int, context: TaskContext, diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index aefcb59b8bb87..5f7d5801338a6 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark._ import org.apache.spark.internal.{config, Logging} +import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle._ import org.apache.spark.shuffle.api.{ShuffleDataIO, ShuffleExecutorComponents} import org.apache.spark.util.Utils @@ -120,27 +121,13 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager */ override def getReader[K, C]( handle: ShuffleHandle, + mapIndexRange: Array[MapStatus] => (Int, Int), startPartition: Int, endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( - handle.shuffleId, startPartition, endPartition) - new BlockStoreShuffleReader( - handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics, - shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context)) - } - - override def getReaderForRange[K, C]( - handle: ShuffleHandle, - startMapIndex: Int, - endMapIndex: Int, - startPartition: Int, - endPartition: Int, - context: TaskContext, - metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { - val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByRange( - handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) + handle.shuffleId, mapIndexRange, startPartition, endPartition) new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics, shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context)) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index d5ee19bde8edf..67d59cc243345 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -317,7 +317,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), Array(size10000, size0, size1000, size0), 6)) assert(tracker.containsShuffle(10)) - assert(tracker.getMapSizesByExecutorId(10, 0, 4).toSeq === + assert(tracker.getMapSizesByExecutorId(10, _ => (0, 2), 0, 4).toSeq === Seq( (BlockManagerId("a", "hostA", 1000), Seq((ShuffleBlockId(10, 5, 1), size1000, 0), diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 9e39271bdf9ee..673efcbce388e 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -411,7 +411,8 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC val taskContext = new TaskContextImpl( 1, 0, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem) val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics() - val reader = manager.getReader[Int, Int](shuffleHandle, 0, 1, taskContext, metrics) + val reader = manager.getReader[Int, Int](shuffleHandle, mapStatus => (0, mapStatus.length), + 0, 1, taskContext, metrics) TaskContext.unset() val readData = reader.read().toIndexedSeq assert(readData === data1.toIndexedSeq || readData === data2.toIndexedSeq) diff --git a/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala b/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala index 46e5e6f97b1f1..21ed025641d5b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala @@ -106,7 +106,8 @@ class CustomShuffledRDD[K, V, C]( val part = p.asInstanceOf[CustomShuffledRDDPartition] val metrics = context.taskMetrics().createTempShuffleReadMetrics() SparkEnv.get.shuffleManager.getReader( - dependency.shuffleHandle, part.startIndexInParent, part.endIndexInParent, context, metrics) + dependency.shuffleHandle, mapStatus => (0, mapStatus.length), part.startIndexInParent, + part.endIndexInParent, context, metrics) .read() .asInstanceOf[Iterator[(K, C)]] } diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala index a82f86a11c77e..14aa16c2a2f53 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala @@ -104,7 +104,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext // shuffle data to read. val mapOutputTracker = mock(classOf[MapOutputTracker]) when(mapOutputTracker.getMapSizesByExecutorId( - shuffleId, reduceId, reduceId + 1)).thenReturn { + shuffleId, null, reduceId, reduceId + 1)).thenReturn { // Test a scenario where all data is local, to avoid creating a bunch of additional mocks // for the code to read data over the network. val shuffleBlockIdsAndSizes = (0 until numMaps).map { mapId => @@ -132,7 +132,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext val taskContext = TaskContext.empty() val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics() val blocksByAddress = mapOutputTracker.getMapSizesByExecutorId( - shuffleId, reduceId, reduceId + 1) + shuffleId, null, reduceId, reduceId + 1) val shuffleReader = new BlockStoreShuffleReader( shuffleHandle, blocksByAddress, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index 5936492dd819c..4fea3c530b066 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -185,26 +185,25 @@ class ShuffledRowRDD( case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) => SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, + mapStatus => (0, mapStatus.length), startReducerIndex, endReducerIndex, context, sqlMetricsReporter) case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex, _) => - SparkEnv.get.shuffleManager.getReaderForRange( + SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, - startMapIndex, - endMapIndex, + _ => (startMapIndex, endMapIndex), reducerIndex, reducerIndex + 1, context, sqlMetricsReporter) case PartialMapperPartitionSpec(mapIndex, startReducerIndex, endReducerIndex) => - SparkEnv.get.shuffleManager.getReaderForRange( + SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, - mapIndex, - mapIndex + 1, + _ => (mapIndex, mapIndex + 1), startReducerIndex, endReducerIndex, context, From f2d28dd1f3c8bcdb1abf55e54f7d32609078b747 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 00:43:36 +0800 Subject: [PATCH 02/15] revert param endMapIndex --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 825ada1b06702..8691945ad30b3 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -936,6 +936,7 @@ private[spark] object MapOutputTracker extends Logging { * @param endPartition End of map output partition ID range (excluded from range) * @param statuses List of map statuses, indexed by map partition index. * @param startMapIndex Start Map index. + * @param endMapIndex End Map index. * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, * and the second item is a sequence of (shuffle block id, shuffle block size, map index) * tuples describing the shuffle blocks that are stored at that block manager. From 356cd501ac6994c7fa31a33042454d442afb6387 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 10:29:16 +0800 Subject: [PATCH 03/15] allMapStatus --- .../main/scala/org/apache/spark/MapOutputTracker.scala | 5 ++++- .../main/scala/org/apache/spark/rdd/CoGroupedRDD.scala | 2 +- .../main/scala/org/apache/spark/rdd/ShuffledRDD.scala | 2 +- .../scala/org/apache/spark/rdd/SubtractedRDD.scala | 10 ++-------- .../src/test/scala/org/apache/spark/ShuffleSuite.scala | 2 +- .../org/apache/spark/scheduler/CustomShuffledRDD.scala | 2 +- .../apache/spark/sql/execution/ShuffledRowRDD.scala | 2 +- 7 files changed, 11 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 8691945ad30b3..2d917f2619bc6 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -322,7 +322,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging // For testing def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - getMapSizesByExecutorId(shuffleId, mapStatus => (0, mapStatus.length), reduceId, reduceId + 1) + getMapSizesByExecutorId(shuffleId, MapOutputTracker.allMapStatus, reduceId, reduceId + 1) } /** @@ -842,6 +842,9 @@ private[spark] object MapOutputTracker extends Logging { private val DIRECT = 0 private val BROADCAST = 1 + // return the mapIndexRange[startMapIndex, endMapIndex), which includes all the MapStatus + val allMapStatus: Array[MapStatus] => (Int, Int) = mapStatus => (0, mapStatus.length) + // Serialize an array of map output locations into an efficient byte format so that we can send // it to reduce tasks. We do this by compressing the serialized bytes using Zstd. They will // generally be pretty compressible because many map outputs will be on the same hostname. diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index d1cf79b20aab5..00dbe5dd721e5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -146,7 +146,7 @@ class CoGroupedRDD[K: ClassTag]( val it = SparkEnv.get.shuffleManager .getReader( shuffleDependency.shuffleHandle, - mapStatus => (0, mapStatus.length), + MapOutputTracker.allMapStatus, split.index, split.index + 1, context, metrics) .read() rddIterators += ((it, depNum)) diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala index edd049a50458f..b2290da30f2c1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala @@ -103,7 +103,7 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] val metrics = context.taskMetrics().createTempShuffleReadMetrics() SparkEnv.get.shuffleManager.getReader( - dep.shuffleHandle, mapStatus => (0, mapStatus.length), + dep.shuffleHandle, MapOutputTracker.allMapStatus, split.index, split.index + 1, context, metrics) .read() .asInstanceOf[Iterator[(K, C)]] diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala index 3628a25d88c14..67cc53298634e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala @@ -23,13 +23,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import org.apache.spark.Dependency -import org.apache.spark.OneToOneDependency -import org.apache.spark.Partition -import org.apache.spark.Partitioner -import org.apache.spark.ShuffleDependency -import org.apache.spark.SparkEnv -import org.apache.spark.TaskContext +import org.apache.spark.{Dependency, MapOutputTracker, OneToOneDependency, Partition, Partitioner, ShuffleDependency, SparkEnv, TaskContext} /** * An optimized version of cogroup for set difference/subtraction. @@ -111,7 +105,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag]( val iter = SparkEnv.get.shuffleManager .getReader( shuffleDependency.shuffleHandle, - mapStatus => (0, mapStatus.length), + MapOutputTracker.allMapStatus, partition.index, partition.index + 1, context, diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 673efcbce388e..a2940eefb65ca 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -411,7 +411,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC val taskContext = new TaskContextImpl( 1, 0, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem) val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics() - val reader = manager.getReader[Int, Int](shuffleHandle, mapStatus => (0, mapStatus.length), + val reader = manager.getReader[Int, Int](shuffleHandle, MapOutputTracker.allMapStatus, 0, 1, taskContext, metrics) TaskContext.unset() val readData = reader.read().toIndexedSeq diff --git a/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala b/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala index 21ed025641d5b..a1666c949f8f3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala @@ -106,7 +106,7 @@ class CustomShuffledRDD[K, V, C]( val part = p.asInstanceOf[CustomShuffledRDDPartition] val metrics = context.taskMetrics().createTempShuffleReadMetrics() SparkEnv.get.shuffleManager.getReader( - dependency.shuffleHandle, mapStatus => (0, mapStatus.length), part.startIndexInParent, + dependency.shuffleHandle, MapOutputTracker.allMapStatus, part.startIndexInParent, part.endIndexInParent, context, metrics) .read() .asInstanceOf[Iterator[(K, C)]] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index 4fea3c530b066..5601bd6a42b2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -185,7 +185,7 @@ class ShuffledRowRDD( case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) => SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, - mapStatus => (0, mapStatus.length), + MapOutputTracker.allMapStatus, startReducerIndex, endReducerIndex, context, From 84a661a0db3eabd64099c90d1d15581253391f6b Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 10:33:05 +0800 Subject: [PATCH 04/15] update log --- .../main/scala/org/apache/spark/MapOutputTracker.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 2d917f2619bc6..14a5722c4ad59 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -725,11 +725,13 @@ private[spark] class MapOutputTrackerMaster( startPartition: Int, endPartition: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") + logDebug(s"Fetching outputs for shuffle $shuffleId") shuffleStatuses.get(shuffleId) match { case Some (shuffleStatus) => shuffleStatus.withMapStatuses { statuses => val (startMapIndex, endMapIndex) = mapIndexRange(statuses) + logDebug(s"Convert map statuses for shuffle $shuffleId, " + + s"partitions $startPartition-$endPartition, mappers $startMapIndex-$endMapIndex") MapOutputTracker.convertMapStatuses( shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex) } @@ -771,10 +773,12 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr startPartition: Int, endPartition: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") + logDebug(s"Fetching outputs for shuffle $shuffleId") val statuses = getStatuses(shuffleId, conf) try { val (startMapIndex, endMapIndex) = mapIndexRange(statuses) + logDebug(s"Convert map statuses for shuffle $shuffleId, " + + s"partitions $startPartition-$endPartition, mappers $startMapIndex-$endMapIndex") MapOutputTracker.convertMapStatuses( shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex) } catch { From 5496bbfaeafe5cc06d1a2de2ef7f787ad32cb94e Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 10:34:53 +0800 Subject: [PATCH 05/15] allMapStatus -> allMapStatuses --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 6 +++--- core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala | 2 +- .../src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala | 2 +- core/src/test/scala/org/apache/spark/ShuffleSuite.scala | 2 +- .../org/apache/spark/scheduler/CustomShuffledRDD.scala | 2 +- .../org/apache/spark/sql/execution/ShuffledRowRDD.scala | 2 +- 7 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 14a5722c4ad59..30152dd4a5bb4 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -322,7 +322,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging // For testing def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - getMapSizesByExecutorId(shuffleId, MapOutputTracker.allMapStatus, reduceId, reduceId + 1) + getMapSizesByExecutorId(shuffleId, MapOutputTracker.allMapStatuses, reduceId, reduceId + 1) } /** @@ -846,8 +846,8 @@ private[spark] object MapOutputTracker extends Logging { private val DIRECT = 0 private val BROADCAST = 1 - // return the mapIndexRange[startMapIndex, endMapIndex), which includes all the MapStatus - val allMapStatus: Array[MapStatus] => (Int, Int) = mapStatus => (0, mapStatus.length) + // return the mapIndexRange[startMapIndex, endMapIndex), which includes all MapStatuses + val allMapStatuses: Array[MapStatus] => (Int, Int) = mapStatus => (0, mapStatus.length) // Serialize an array of map output locations into an efficient byte format so that we can send // it to reduce tasks. We do this by compressing the serialized bytes using Zstd. They will diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 00dbe5dd721e5..75ba0483e13b1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -146,7 +146,7 @@ class CoGroupedRDD[K: ClassTag]( val it = SparkEnv.get.shuffleManager .getReader( shuffleDependency.shuffleHandle, - MapOutputTracker.allMapStatus, + MapOutputTracker.allMapStatuses, split.index, split.index + 1, context, metrics) .read() rddIterators += ((it, depNum)) diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala index b2290da30f2c1..75abe4f69af87 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala @@ -103,7 +103,7 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] val metrics = context.taskMetrics().createTempShuffleReadMetrics() SparkEnv.get.shuffleManager.getReader( - dep.shuffleHandle, MapOutputTracker.allMapStatus, + dep.shuffleHandle, MapOutputTracker.allMapStatuses, split.index, split.index + 1, context, metrics) .read() .asInstanceOf[Iterator[(K, C)]] diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala index 67cc53298634e..1367f0212366a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala @@ -105,7 +105,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag]( val iter = SparkEnv.get.shuffleManager .getReader( shuffleDependency.shuffleHandle, - MapOutputTracker.allMapStatus, + MapOutputTracker.allMapStatuses, partition.index, partition.index + 1, context, diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index a2940eefb65ca..350395a11d65f 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -411,7 +411,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC val taskContext = new TaskContextImpl( 1, 0, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem) val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics() - val reader = manager.getReader[Int, Int](shuffleHandle, MapOutputTracker.allMapStatus, + val reader = manager.getReader[Int, Int](shuffleHandle, MapOutputTracker.allMapStatuses, 0, 1, taskContext, metrics) TaskContext.unset() val readData = reader.read().toIndexedSeq diff --git a/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala b/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala index a1666c949f8f3..4af5e88f181b5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala @@ -106,7 +106,7 @@ class CustomShuffledRDD[K, V, C]( val part = p.asInstanceOf[CustomShuffledRDDPartition] val metrics = context.taskMetrics().createTempShuffleReadMetrics() SparkEnv.get.shuffleManager.getReader( - dependency.shuffleHandle, MapOutputTracker.allMapStatus, part.startIndexInParent, + dependency.shuffleHandle, MapOutputTracker.allMapStatuses, part.startIndexInParent, part.endIndexInParent, context, metrics) .read() .asInstanceOf[Iterator[(K, C)]] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index 5601bd6a42b2b..3472ac147703d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -185,7 +185,7 @@ class ShuffledRowRDD( case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) => SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, - MapOutputTracker.allMapStatus, + MapOutputTracker.allMapStatuses, startReducerIndex, endReducerIndex, context, From 05fe4c7be3a7b55d7a04e461e9c79c92031e627c Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 10:40:31 +0800 Subject: [PATCH 06/15] update comment --- .../main/scala/org/apache/spark/shuffle/ShuffleManager.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala index 7443c120e4edc..45e30bc4b79d7 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala @@ -46,7 +46,9 @@ private[spark] trait ShuffleManager { /** * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to - * read from map output which specified by mapIndexRange. + * read from a range of map outputs which specified by mapIndexRange(startMapIndex to + * endMapIndex-1, inclusive). + * * Called on executors by reduce tasks. */ def getReader[K, C]( From 2a31450f341ac5d4fc46817dd65248b5d973c002 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 14:29:56 +0800 Subject: [PATCH 07/15] use start/endMapIndex directly --- .../org/apache/spark/MapOutputTracker.scala | 26 +++++++++---------- .../org/apache/spark/rdd/CoGroupedRDD.scala | 3 +-- .../org/apache/spark/rdd/ShuffledRDD.scala | 2 +- .../org/apache/spark/rdd/SubtractedRDD.scala | 3 ++- .../apache/spark/shuffle/ShuffleManager.scala | 8 +++--- .../shuffle/sort/SortShuffleManager.scala | 11 +++++--- .../apache/spark/MapOutputTrackerSuite.scala | 2 +- .../scala/org/apache/spark/ShuffleSuite.scala | 2 +- .../spark/scheduler/CustomShuffledRDD.scala | 2 +- .../BlockStoreShuffleReaderSuite.scala | 4 +-- .../spark/sql/execution/ShuffledRowRDD.scala | 9 ++++--- 11 files changed, 41 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 30152dd4a5bb4..653838363c59c 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -322,7 +322,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging // For testing def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - getMapSizesByExecutorId(shuffleId, MapOutputTracker.allMapStatuses, reduceId, reduceId + 1) + getMapSizesByExecutorId(shuffleId, 0, Int.MaxValue, reduceId, reduceId + 1) } /** @@ -336,7 +336,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging */ def getMapSizesByExecutorId( shuffleId: Int, - mapIndexRange: Array[MapStatus] => (Int, Int), + startMapIndex: Int, + endMapIndex: Int, startPartition: Int, endPartition: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] @@ -721,7 +722,8 @@ private[spark] class MapOutputTrackerMaster( // This method is only called in local-mode. def getMapSizesByExecutorId( shuffleId: Int, - mapIndexRange: Array[MapStatus] => (Int, Int), + startMapIndex: Int, + endMapIndex: Int, startPartition: Int, endPartition: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { @@ -729,11 +731,11 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId) match { case Some (shuffleStatus) => shuffleStatus.withMapStatuses { statuses => - val (startMapIndex, endMapIndex) = mapIndexRange(statuses) + val endMapIndex0 = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex logDebug(s"Convert map statuses for shuffle $shuffleId, " + - s"partitions $startPartition-$endPartition, mappers $startMapIndex-$endMapIndex") + s"partitions $startPartition-$endPartition, mappers $startMapIndex-$endMapIndex0") MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex) + shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex0) } case None => Iterator.empty @@ -769,18 +771,19 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. override def getMapSizesByExecutorId( shuffleId: Int, - mapIndexRange: Array[MapStatus] => (Int, Int), + startMapIndex: Int, + endMapIndex: Int, startPartition: Int, endPartition: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId") val statuses = getStatuses(shuffleId, conf) try { - val (startMapIndex, endMapIndex) = mapIndexRange(statuses) + val endMapIndex0 = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex logDebug(s"Convert map statuses for shuffle $shuffleId, " + - s"partitions $startPartition-$endPartition, mappers $startMapIndex-$endMapIndex") + s"partitions $startPartition-$endPartition, mappers $startMapIndex-$endMapIndex0") MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex) + shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex0) } catch { case e: MetadataFetchFailedException => // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: @@ -846,9 +849,6 @@ private[spark] object MapOutputTracker extends Logging { private val DIRECT = 0 private val BROADCAST = 1 - // return the mapIndexRange[startMapIndex, endMapIndex), which includes all MapStatuses - val allMapStatuses: Array[MapStatus] => (Int, Int) = mapStatus => (0, mapStatus.length) - // Serialize an array of map output locations into an efficient byte format so that we can send // it to reduce tasks. We do this by compressing the serialized bytes using Zstd. They will // generally be pretty compressible because many map outputs will be on the same hostname. diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 75ba0483e13b1..b9c94f5291312 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -145,8 +145,7 @@ class CoGroupedRDD[K: ClassTag]( val metrics = context.taskMetrics().createTempShuffleReadMetrics() val it = SparkEnv.get.shuffleManager .getReader( - shuffleDependency.shuffleHandle, - MapOutputTracker.allMapStatuses, + shuffleDependency.shuffleHandle, 0, Int.MaxValue, split.index, split.index + 1, context, metrics) .read() rddIterators += ((it, depNum)) diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala index 75abe4f69af87..1fef0ee07bf3d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala @@ -103,7 +103,7 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] val metrics = context.taskMetrics().createTempShuffleReadMetrics() SparkEnv.get.shuffleManager.getReader( - dep.shuffleHandle, MapOutputTracker.allMapStatuses, + dep.shuffleHandle, 0, Int.MaxValue, split.index, split.index + 1, context, metrics) .read() .asInstanceOf[Iterator[(K, C)]] diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala index 1367f0212366a..9aa353e16ee8a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala @@ -105,7 +105,8 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag]( val iter = SparkEnv.get.shuffleManager .getReader( shuffleDependency.shuffleHandle, - MapOutputTracker.allMapStatuses, + 0, + Int.MaxValue, partition.index, partition.index + 1, context, diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala index 45e30bc4b79d7..c608c834f7f3e 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala @@ -46,14 +46,16 @@ private[spark] trait ShuffleManager { /** * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to - * read from a range of map outputs which specified by mapIndexRange(startMapIndex to - * endMapIndex-1, inclusive). + * read from a range of map outputs(startMapIndex to endMapIndex-1, inclusive). + * If endMapIndex=Int.MaxValue, the real endMapIndex will be changed to the length of total map + * outputs of the shuffle in `getMapSizesByExecutorId`. * * Called on executors by reduce tasks. */ def getReader[K, C]( handle: ShuffleHandle, - mapIndexRange: Array[MapStatus] => (Int, Int), + startMapIndex: Int, + endMapIndex: Int, startPartition: Int, endPartition: Int, context: TaskContext, diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 5f7d5801338a6..063c67a05b863 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -116,18 +116,23 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager } /** - * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to + * read from a range of map outputs(startMapIndex to endMapIndex-1, inclusive). + * If endMapIndex=Int.MaxValue, the real endMapIndex will be changed to the length of total map + * outputs of the shuffle in `getMapSizesByExecutorId`. + * * Called on executors by reduce tasks. */ override def getReader[K, C]( handle: ShuffleHandle, - mapIndexRange: Array[MapStatus] => (Int, Int), + startMapIndex: Int, + endMapIndex: Int, startPartition: Int, endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( - handle.shuffleId, mapIndexRange, startPartition, endPartition) + handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics, shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context)) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 67d59cc243345..630ffd9baa06e 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -317,7 +317,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), Array(size10000, size0, size1000, size0), 6)) assert(tracker.containsShuffle(10)) - assert(tracker.getMapSizesByExecutorId(10, _ => (0, 2), 0, 4).toSeq === + assert(tracker.getMapSizesByExecutorId(10, 0, 2, 0, 4).toSeq === Seq( (BlockManagerId("a", "hostA", 1000), Seq((ShuffleBlockId(10, 5, 1), size1000, 0), diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 350395a11d65f..cc15277df24f3 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -411,7 +411,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC val taskContext = new TaskContextImpl( 1, 0, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem) val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics() - val reader = manager.getReader[Int, Int](shuffleHandle, MapOutputTracker.allMapStatuses, + val reader = manager.getReader[Int, Int](shuffleHandle, 0, Int.MaxValue, 0, 1, taskContext, metrics) TaskContext.unset() val readData = reader.read().toIndexedSeq diff --git a/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala b/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala index 4af5e88f181b5..4c295f62c1e82 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala @@ -106,7 +106,7 @@ class CustomShuffledRDD[K, V, C]( val part = p.asInstanceOf[CustomShuffledRDDPartition] val metrics = context.taskMetrics().createTempShuffleReadMetrics() SparkEnv.get.shuffleManager.getReader( - dependency.shuffleHandle, MapOutputTracker.allMapStatuses, part.startIndexInParent, + dependency.shuffleHandle, 0, Int.MaxValue, part.startIndexInParent, part.endIndexInParent, context, metrics) .read() .asInstanceOf[Iterator[(K, C)]] diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala index 14aa16c2a2f53..d964b28df2983 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala @@ -104,7 +104,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext // shuffle data to read. val mapOutputTracker = mock(classOf[MapOutputTracker]) when(mapOutputTracker.getMapSizesByExecutorId( - shuffleId, null, reduceId, reduceId + 1)).thenReturn { + shuffleId, 0, numMaps, reduceId, reduceId + 1)).thenReturn { // Test a scenario where all data is local, to avoid creating a bunch of additional mocks // for the code to read data over the network. val shuffleBlockIdsAndSizes = (0 until numMaps).map { mapId => @@ -132,7 +132,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext val taskContext = TaskContext.empty() val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics() val blocksByAddress = mapOutputTracker.getMapSizesByExecutorId( - shuffleId, null, reduceId, reduceId + 1) + shuffleId, 0, numMaps, reduceId, reduceId + 1) val shuffleReader = new BlockStoreShuffleReader( shuffleHandle, blocksByAddress, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index 3472ac147703d..13d863ad79b70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -185,7 +185,8 @@ class ShuffledRowRDD( case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) => SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, - MapOutputTracker.allMapStatuses, + 0, + Int.MaxValue, startReducerIndex, endReducerIndex, context, @@ -194,7 +195,8 @@ class ShuffledRowRDD( case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex, _) => SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, - _ => (startMapIndex, endMapIndex), + startMapIndex, + endMapIndex, reducerIndex, reducerIndex + 1, context, @@ -203,7 +205,8 @@ class ShuffledRowRDD( case PartialMapperPartitionSpec(mapIndex, startReducerIndex, endReducerIndex) => SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, - _ => (mapIndex, mapIndex + 1), + mapIndex, + mapIndex + 1, startReducerIndex, endReducerIndex, context, From 819e3e93f776f01bc103743419c2208281b24fc6 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 15:17:38 +0800 Subject: [PATCH 08/15] remove unused import --- .../src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala index c608c834f7f3e..e55b36f26c2af 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala @@ -18,7 +18,6 @@ package org.apache.spark.shuffle import org.apache.spark.{ShuffleDependency, TaskContext} -import org.apache.spark.scheduler.MapStatus /** * Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the driver From a487a32f3228d32a9321fd4b7d35d1fa48a47591 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 15:35:49 +0800 Subject: [PATCH 09/15] endMapIndex0 -> actualEndMapIndex --- .../scala/org/apache/spark/MapOutputTracker.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 653838363c59c..b0e3883638d8c 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -731,11 +731,11 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId) match { case Some (shuffleStatus) => shuffleStatus.withMapStatuses { statuses => - val endMapIndex0 = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex + val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex logDebug(s"Convert map statuses for shuffle $shuffleId, " + - s"partitions $startPartition-$endPartition, mappers $startMapIndex-$endMapIndex0") + s"partitions $startPartition-$endPartition, mappers $startMapIndex-$actualEndMapIndex") MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex0) + shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex) } case None => Iterator.empty @@ -779,11 +779,11 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr logDebug(s"Fetching outputs for shuffle $shuffleId") val statuses = getStatuses(shuffleId, conf) try { - val endMapIndex0 = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex + val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex logDebug(s"Convert map statuses for shuffle $shuffleId, " + - s"partitions $startPartition-$endPartition, mappers $startMapIndex-$endMapIndex0") + s"partitions $startPartition-$endPartition, mappers $startMapIndex-$actualEndMapIndex") MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex0) + shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex) } catch { case e: MetadataFetchFailedException => // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: From 62cc46cd4dfd2de5534b9b102b2b53eacd0100bc Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 15:37:07 +0800 Subject: [PATCH 10/15] mappers before partitions --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index b0e3883638d8c..0d7561827f078 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -733,7 +733,7 @@ private[spark] class MapOutputTrackerMaster( shuffleStatus.withMapStatuses { statuses => val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex logDebug(s"Convert map statuses for shuffle $shuffleId, " + - s"partitions $startPartition-$endPartition, mappers $startMapIndex-$actualEndMapIndex") + s"mappers $startMapIndex-$actualEndMapIndex, partitions $startPartition-$endPartition") MapOutputTracker.convertMapStatuses( shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex) } @@ -781,7 +781,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr try { val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex logDebug(s"Convert map statuses for shuffle $shuffleId, " + - s"partitions $startPartition-$endPartition, mappers $startMapIndex-$actualEndMapIndex") + s"mappers $startMapIndex-$actualEndMapIndex, partitions $startPartition-$endPartition") MapOutputTracker.convertMapStatuses( shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex) } catch { From 109eeb39f14f0f44d593cb38df407c82da9d0572 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 15:56:59 +0800 Subject: [PATCH 11/15] overload getReader --- .../org/apache/spark/rdd/CoGroupedRDD.scala | 3 +-- .../scala/org/apache/spark/rdd/ShuffledRDD.scala | 3 +-- .../org/apache/spark/rdd/SubtractedRDD.scala | 2 -- .../apache/spark/shuffle/ShuffleManager.scala | 16 ++++++++++++++++ .../scala/org/apache/spark/ShuffleSuite.scala | 3 +-- .../spark/scheduler/CustomShuffledRDD.scala | 3 +-- .../spark/sql/execution/ShuffledRowRDD.scala | 2 -- 7 files changed, 20 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index b9c94f5291312..500d306f336ac 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -145,8 +145,7 @@ class CoGroupedRDD[K: ClassTag]( val metrics = context.taskMetrics().createTempShuffleReadMetrics() val it = SparkEnv.get.shuffleManager .getReader( - shuffleDependency.shuffleHandle, 0, Int.MaxValue, - split.index, split.index + 1, context, metrics) + shuffleDependency.shuffleHandle, split.index, split.index + 1, context, metrics) .read() rddIterators += ((it, depNum)) } diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala index 1fef0ee07bf3d..0930a5c9cfb96 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala @@ -103,8 +103,7 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] val metrics = context.taskMetrics().createTempShuffleReadMetrics() SparkEnv.get.shuffleManager.getReader( - dep.shuffleHandle, 0, Int.MaxValue, - split.index, split.index + 1, context, metrics) + dep.shuffleHandle, split.index, split.index + 1, context, metrics) .read() .asInstanceOf[Iterator[(K, C)]] } diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala index 9aa353e16ee8a..41839741c538b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala @@ -105,8 +105,6 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag]( val iter = SparkEnv.get.shuffleManager .getReader( shuffleDependency.shuffleHandle, - 0, - Int.MaxValue, partition.index, partition.index + 1, context, diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala index e55b36f26c2af..cf4ff9746d096 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala @@ -43,6 +43,22 @@ private[spark] trait ShuffleManager { context: TaskContext, metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] + + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to + * read from all map outputs of the shuffle. + * + * Called on executors by reduce tasks. + */ + final def getReader[K, C]( + handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, + context: TaskContext, + metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { + getReader(handle, 0, Int.MaxValue, startPartition, endPartition, context, metrics) + } + /** * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to * read from a range of map outputs(startMapIndex to endMapIndex-1, inclusive). diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index cc15277df24f3..9e39271bdf9ee 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -411,8 +411,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC val taskContext = new TaskContextImpl( 1, 0, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem) val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics() - val reader = manager.getReader[Int, Int](shuffleHandle, 0, Int.MaxValue, - 0, 1, taskContext, metrics) + val reader = manager.getReader[Int, Int](shuffleHandle, 0, 1, taskContext, metrics) TaskContext.unset() val readData = reader.read().toIndexedSeq assert(readData === data1.toIndexedSeq || readData === data2.toIndexedSeq) diff --git a/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala b/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala index 4c295f62c1e82..46e5e6f97b1f1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala @@ -106,8 +106,7 @@ class CustomShuffledRDD[K, V, C]( val part = p.asInstanceOf[CustomShuffledRDDPartition] val metrics = context.taskMetrics().createTempShuffleReadMetrics() SparkEnv.get.shuffleManager.getReader( - dependency.shuffleHandle, 0, Int.MaxValue, part.startIndexInParent, - part.endIndexInParent, context, metrics) + dependency.shuffleHandle, part.startIndexInParent, part.endIndexInParent, context, metrics) .read() .asInstanceOf[Iterator[(K, C)]] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index 13d863ad79b70..b5e9655a776b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -185,8 +185,6 @@ class ShuffledRowRDD( case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) => SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, - 0, - Int.MaxValue, startReducerIndex, endReducerIndex, context, From fbef8d2936ce7353f2d94a35f1b42f37637438d4 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 20:03:21 +0800 Subject: [PATCH 12/15] revert change --- .../main/scala/org/apache/spark/rdd/SubtractedRDD.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala index 41839741c538b..d5a811d4dc3fd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala @@ -23,7 +23,13 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import org.apache.spark.{Dependency, MapOutputTracker, OneToOneDependency, Partition, Partitioner, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.Dependency +import org.apache.spark.OneToOneDependency +import org.apache.spark.Partition +import org.apache.spark.Partitioner +import org.apache.spark.ShuffleDependency +import org.apache.spark.SparkEnv +import org.apache.spark.TaskContext /** * An optimized version of cogroup for set difference/subtraction. From b7ec5836cf2ac89d80f82ad577ec04549ecbd301 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 23 Jun 2020 20:05:04 +0800 Subject: [PATCH 13/15] fix some style --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 0d7561827f078..b624c7f984633 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -729,7 +729,7 @@ private[spark] class MapOutputTrackerMaster( : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId") shuffleStatuses.get(shuffleId) match { - case Some (shuffleStatus) => + case Some(shuffleStatus) => shuffleStatus.withMapStatuses { statuses => val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex logDebug(s"Convert map statuses for shuffle $shuffleId, " + From 29940501f8b5ffb845ad24c24d53234d71020881 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Wed, 24 Jun 2020 14:01:27 +0800 Subject: [PATCH 14/15] update --- .../main/scala/org/apache/spark/MapOutputTracker.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index b624c7f984633..8bdfbe8f4da44 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -339,8 +339,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging startMapIndex: Int, endMapIndex: Int, startPartition: Int, - endPartition: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] + endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] /** * Deletes map output status information for the specified shuffle stage. @@ -725,8 +724,7 @@ private[spark] class MapOutputTrackerMaster( startMapIndex: Int, endMapIndex: Int, startPartition: Int, - endPartition: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { + endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId") shuffleStatuses.get(shuffleId) match { case Some(shuffleStatus) => @@ -774,8 +772,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr startMapIndex: Int, endMapIndex: Int, startPartition: Int, - endPartition: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { + endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId") val statuses = getStatuses(shuffleId, conf) try { From 4247fa323f264af65609f7285e82f86b8dd9be60 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 29 Jun 2020 10:50:42 +0800 Subject: [PATCH 15/15] update comment --- .../src/main/scala/org/apache/spark/MapOutputTracker.scala | 7 ++++--- .../scala/org/apache/spark/shuffle/ShuffleManager.scala | 2 +- .../org/apache/spark/shuffle/sort/SortShuffleManager.scala | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 8bdfbe8f4da44..18cd5de4cfada 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -328,11 +328,14 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging /** * Called from executors to get the server URIs and output sizes for each shuffle block that * needs to be read from a given range of map output partitions (startPartition is included but - * endPartition is excluded from the range). + * endPartition is excluded from the range) within a range of mappers (startMapIndex is included + * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be + * changed to the length of total map outputs. * * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, * and the second item is a sequence of (shuffle block id, shuffle block size, map index) * tuples describing the shuffle blocks that are stored at that block manager. + * Note that zero-sized blocks are excluded in the result. */ def getMapSizesByExecutorId( shuffleId: Int, @@ -717,7 +720,6 @@ private[spark] class MapOutputTrackerMaster( } } - // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. // This method is only called in local-mode. def getMapSizesByExecutorId( shuffleId: Int, @@ -766,7 +768,6 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr */ private val fetchingLock = new KeyLock[Int] - // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. override def getMapSizesByExecutorId( shuffleId: Int, startMapIndex: Int, diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala index cf4ff9746d096..400c4526f0114 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala @@ -62,7 +62,7 @@ private[spark] trait ShuffleManager { /** * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to * read from a range of map outputs(startMapIndex to endMapIndex-1, inclusive). - * If endMapIndex=Int.MaxValue, the real endMapIndex will be changed to the length of total map + * If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length of total map * outputs of the shuffle in `getMapSizesByExecutorId`. * * Called on executors by reduce tasks. diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 063c67a05b863..72460180f5908 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -118,7 +118,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager /** * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to * read from a range of map outputs(startMapIndex to endMapIndex-1, inclusive). - * If endMapIndex=Int.MaxValue, the real endMapIndex will be changed to the length of total map + * If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length of total map * outputs of the shuffle in `getMapSizesByExecutorId`. * * Called on executors by reduce tasks.