From 3ce85246201ca76cff29584bcc1853046b1d8285 Mon Sep 17 00:00:00 2001 From: peterpc0701 Date: Thu, 31 Mar 2016 21:07:20 +0800 Subject: [PATCH 1/3] Improve shuffle load balancing and minimize network data transmission. --- .../org/apache/spark/MapOutputTracker.scala | 138 +++++++++++++++++- .../apache/spark/MapOutputTrackerSuite.scala | 29 ++++ 2 files changed, 165 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 3a5caa3510eb..629db4286f55 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -356,8 +356,9 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) : Seq[String] = { if (shuffleLocalityEnabled && dep.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD && dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) { - val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, partitionId, - dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION) + // replace getLocationsWithLargestOutputs with getLocationsWithOverAllSituation + val blockManagerIds = getLocationsWithGlobalMode(dep.shuffleId, + dep.partitioner.numPartitions) if (blockManagerIds.nonEmpty) { blockManagerIds.get.map(_.host) } else { @@ -421,6 +422,139 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) None } + /** + * Return a list of locations that each have fraction of map output according to load balancing + * and achieve fetching least data. + * + * @param shuffleId id of the shuffle + * @param numReducers total number of reducers in the shuffle + * + */ + def getLocationsWithGlobalMode( + shuffleId: Int, + numReducers: Int + ) + : Option[Array[BlockManagerId]] = { + val statuses = mapStatuses.get(shuffleId).orNull + assert(statuses != null) + val splitsByLocation = new HashMap[BlockManagerId, Array[Long]] + var sumOfAllBytes: Long = 0 + statuses.foreach { + status => + if (status == null) { + throw new MetadataFetchFailedException( + shuffleId, -1, "Missing an output location for shuffle " + shuffleId) + } else { + val location = status.location + if (!splitsByLocation.contains(location)) { + splitsByLocation(location) = new Array[Long](numReducers) + } + var i = 0 + while (i < numReducers) { + val byteSize = status.getSizeForBlock(i) + splitsByLocation(location)(i) += byteSize + sumOfAllBytes += byteSize + i += 1 + } + } + } + if (splitsByLocation.nonEmpty) { + val numOfLocations = splitsByLocation.size + val preferredLocationsOfReduces = new Array[BlockManagerId](numReducers) + val bytesOfReduces = new Array[Long](numReducers) + val blockManagerIdMaps = new HashMap[Int, BlockManagerId] + val splitIndexOfLocation = new Array[HashSet[Int]](numOfLocations) + var i = 0 + var j = 0 + //caclulate the bytesize of each reducer + splitsByLocation.toSeq.map( + s => { + val (blockManagerId, byteSize) = s + blockManagerIdMaps(i) = blockManagerId + splitIndexOfLocation(i) = new HashSet[Int] + j = 0 + byteSize.map( + b => { + bytesOfReduces(j) += b + j += 1 + }) + i += 1 + }) + + val indexOfBytesOfReduces = new HashMap[Int, Long] + for ((size, index) <- bytesOfReduces.zipWithIndex) { + indexOfBytesOfReduces.getOrElseUpdate(index, size) + } + val sortedIndexOfBytesOfReducer = indexOfBytesOfReduces.toSeq.sortWith(_._2 > _._2) + val splitSumOfByteSizeOfLocation = new Array[Long](numOfLocations) + + //Divide the tasks into n groups according to the number of nodes and data size, + // ensuring that the data size for each group is nearly equal to achieve load balancing. + for (i <- sortedIndexOfBytesOfReducer.indices) { + var minIndex = 0 + for (j <- 1 until numOfLocations) { + if (splitSumOfByteSizeOfLocation(j) < splitSumOfByteSizeOfLocation(minIndex)) { + minIndex = j + } + } + val (index, byteSize) = sortedIndexOfBytesOfReducer(i) + splitSumOfByteSizeOfLocation(minIndex) += byteSize + splitIndexOfLocation(minIndex).add(index) + } + + // Determine the amount of local data if the tasks of every group are executed on every node. + // Thus, a n × n matrix is created. + val splitBytesOfLocationsAndGroup = new Array[Array[Long]](numOfLocations) + for (i <- splitBytesOfLocationsAndGroup.indices) { + splitBytesOfLocationsAndGroup(i) = new Array[Long](numOfLocations) + } + for (i <- splitIndexOfLocation.indices) { + val iter: Iterator[Int] = splitIndexOfLocation(i).iterator + while (iter.hasNext) { + val index = iter.next() + val bytesOfLocations: Seq[(BlockManagerId, Long)] = splitsByLocation.toSeq.map(s => (s._1, s._2(index))) + for (j <- bytesOfLocations.indices) { + splitBytesOfLocationsAndGroup(i)(j) += bytesOfLocations(j)._2 + } + } + } + //Choose the largest value in the matrix to identify which group is allocated to which node. + // Mark the row and column at which the selected group is located to ensure that the group + // is not chosen next time. Goto Step 4 until no group is available. + for (i <- 0 until numOfLocations) { + var maxCol = 0 + var maxRow = 0 + var maxValue = splitBytesOfLocationsAndGroup(maxRow)(maxCol) + for (j <- splitBytesOfLocationsAndGroup.indices) { + for (k <- splitBytesOfLocationsAndGroup(j).indices) { + if (splitBytesOfLocationsAndGroup(j)(k) > maxValue) { + maxRow = j + maxCol = k + maxValue = splitBytesOfLocationsAndGroup(j)(k) + } + } + } + val iter: Iterator[Int] = splitIndexOfLocation(maxRow).iterator + while (iter.hasNext) { + val index = iter.next() + preferredLocationsOfReduces(index) = blockManagerIdMaps(maxCol) + } + for (j <- splitBytesOfLocationsAndGroup.indices) { + splitBytesOfLocationsAndGroup(j)(maxCol) = -1 + } + for (k <- splitBytesOfLocationsAndGroup.indices) { + splitBytesOfLocationsAndGroup(maxRow)(k) = -1 + } + } + Some(preferredLocationsOfReduces) + } + else + None + } + + + + def incrementEpoch() { epochLock.synchronized { epoch += 1 diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index ddf48765ec30..468021b48dac 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -242,4 +242,33 @@ class MapOutputTrackerSuite extends SparkFunSuite { tracker.stop() rpcEnv.shutdown() } + + + test("getLocationsWithOverAllSituation with multiple outputs") { + val rpcEnv = createRpcEnv("test") + val tracker = new MapOutputTrackerMaster(conf) + tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, + new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf)) + // Setup 4 map tasks + tracker.registerShuffle(10, 4) + tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), + Array(1L,2L,3L,4L,5L,6L,7L))) + tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), + Array(2L,8L,2L,5L,2L,2L,9L))) + tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("c", "hostC", 1000), + Array(3L,2L,7L,4L,2L,6L,1L))) + tracker.registerMapOutput(10, 3, MapStatus(BlockManagerId("c", "hostC", 1000), + Array(8L,5L,3L,4L,7L,2L,5L))) + + val topLocs = tracker.getLocationsWithGlobalMode(10, 7) + assert(topLocs.nonEmpty) + assert(topLocs.get.size === 7) + assert(topLocs.get === + Array(BlockManagerId("c", "hostC", 1000), BlockManagerId("c", "hostC", 1000), BlockManagerId("b", "hostB", 1000), + BlockManagerId("a", "hostA", 1000), BlockManagerId("a", "hostA", 1000), BlockManagerId("c", "hostC", 1000), + BlockManagerId("b", "hostB", 1000))) + + tracker.stop() + rpcEnv.shutdown() + } } From 9cd2294171ec06a059f6b9922be0b4a6d9b89f7d Mon Sep 17 00:00:00 2001 From: peterpc0701 Date: Thu, 31 Mar 2016 21:47:12 +0800 Subject: [PATCH 2/3] Improve shuffle load balancing and minimize network data transmission. --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 629db4286f55..6d5f14925b8a 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -430,11 +430,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) * @param numReducers total number of reducers in the shuffle * */ - def getLocationsWithGlobalMode( - shuffleId: Int, - numReducers: Int - ) - : Option[Array[BlockManagerId]] = { + def getLocationsWithGlobalMode(shuffleId: Int, numReducers: Int): Option[Array[BlockManagerId]] = { val statuses = mapStatuses.get(shuffleId).orNull assert(statuses != null) val splitsByLocation = new HashMap[BlockManagerId, Array[Long]] From 5fe80e6eaaba54116d57fc60566c33ce976f8492 Mon Sep 17 00:00:00 2001 From: peterpc0701 Date: Sun, 29 May 2016 15:21:42 +0800 Subject: [PATCH 3/3] fix some minor formatting things about this issue --- .../org/apache/spark/MapOutputTracker.scala | 80 +++++++++---------- 1 file changed, 37 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 6d5f14925b8a..899591f0024b 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -445,12 +445,10 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) if (!splitsByLocation.contains(location)) { splitsByLocation(location) = new Array[Long](numReducers) } - var i = 0 - while (i < numReducers) { - val byteSize = status.getSizeForBlock(i) - splitsByLocation(location)(i) += byteSize + for (index <- 0 until numReducers) { + val byteSize = status.getSizeForBlock(index) + splitsByLocation(location)(index) += byteSize sumOfAllBytes += byteSize - i += 1 } } } @@ -460,21 +458,17 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) val bytesOfReduces = new Array[Long](numReducers) val blockManagerIdMaps = new HashMap[Int, BlockManagerId] val splitIndexOfLocation = new Array[HashSet[Int]](numOfLocations) - var i = 0 - var j = 0 + var locIndex = 0 //caclulate the bytesize of each reducer splitsByLocation.toSeq.map( - s => { - val (blockManagerId, byteSize) = s - blockManagerIdMaps(i) = blockManagerId - splitIndexOfLocation(i) = new HashSet[Int] - j = 0 - byteSize.map( - b => { - bytesOfReduces(j) += b - j += 1 - }) - i += 1 + kvItems => { + val (blockManagerId, byteSizes) = kvItems + blockManagerIdMaps(locIndex) = blockManagerId + splitIndexOfLocation(locIndex) = new HashSet[Int] + for (index <- 0 until byteSizes.length) { + bytesOfReduces(index) += byteSizes(index) + } + locIndex += 1 }) val indexOfBytesOfReduces = new HashMap[Int, Long] @@ -486,47 +480,47 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) //Divide the tasks into n groups according to the number of nodes and data size, // ensuring that the data size for each group is nearly equal to achieve load balancing. - for (i <- sortedIndexOfBytesOfReducer.indices) { - var minIndex = 0 - for (j <- 1 until numOfLocations) { - if (splitSumOfByteSizeOfLocation(j) < splitSumOfByteSizeOfLocation(minIndex)) { - minIndex = j + for (index <- sortedIndexOfBytesOfReducer.indices) { + var minLocIndex = 0 + for (locIndex <- 1 until numOfLocations) { + if (splitSumOfByteSizeOfLocation(locIndex) < splitSumOfByteSizeOfLocation(minLocIndex)) { + minLocIndex = locIndex } } - val (index, byteSize) = sortedIndexOfBytesOfReducer(i) - splitSumOfByteSizeOfLocation(minIndex) += byteSize - splitIndexOfLocation(minIndex).add(index) + val (loc, byteSize) = sortedIndexOfBytesOfReducer(index) + splitSumOfByteSizeOfLocation(minLocIndex) += byteSize + splitIndexOfLocation(minLocIndex).add(loc) } // Determine the amount of local data if the tasks of every group are executed on every node. // Thus, a n × n matrix is created. val splitBytesOfLocationsAndGroup = new Array[Array[Long]](numOfLocations) - for (i <- splitBytesOfLocationsAndGroup.indices) { - splitBytesOfLocationsAndGroup(i) = new Array[Long](numOfLocations) + for (index <- splitBytesOfLocationsAndGroup.indices) { + splitBytesOfLocationsAndGroup(index) = new Array[Long](numOfLocations) } - for (i <- splitIndexOfLocation.indices) { - val iter: Iterator[Int] = splitIndexOfLocation(i).iterator + for (row <- splitIndexOfLocation.indices) { + val iter: Iterator[Int] = splitIndexOfLocation(row).iterator while (iter.hasNext) { val index = iter.next() val bytesOfLocations: Seq[(BlockManagerId, Long)] = splitsByLocation.toSeq.map(s => (s._1, s._2(index))) - for (j <- bytesOfLocations.indices) { - splitBytesOfLocationsAndGroup(i)(j) += bytesOfLocations(j)._2 + for (col <- bytesOfLocations.indices) { + splitBytesOfLocationsAndGroup(row)(col) += bytesOfLocations(col)._2 } } } //Choose the largest value in the matrix to identify which group is allocated to which node. // Mark the row and column at which the selected group is located to ensure that the group // is not chosen next time. Goto Step 4 until no group is available. - for (i <- 0 until numOfLocations) { + for (index <- 0 until numOfLocations) { var maxCol = 0 var maxRow = 0 var maxValue = splitBytesOfLocationsAndGroup(maxRow)(maxCol) - for (j <- splitBytesOfLocationsAndGroup.indices) { - for (k <- splitBytesOfLocationsAndGroup(j).indices) { - if (splitBytesOfLocationsAndGroup(j)(k) > maxValue) { - maxRow = j - maxCol = k - maxValue = splitBytesOfLocationsAndGroup(j)(k) + for (row <- splitBytesOfLocationsAndGroup.indices) { + for (col <- splitBytesOfLocationsAndGroup(row).indices) { + if (splitBytesOfLocationsAndGroup(row)(col) > maxValue) { + maxRow = row + maxCol = col + maxValue = splitBytesOfLocationsAndGroup(row)(col) } } } @@ -535,11 +529,11 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) val index = iter.next() preferredLocationsOfReduces(index) = blockManagerIdMaps(maxCol) } - for (j <- splitBytesOfLocationsAndGroup.indices) { - splitBytesOfLocationsAndGroup(j)(maxCol) = -1 + for (row <- splitBytesOfLocationsAndGroup.indices) { + splitBytesOfLocationsAndGroup(row)(maxCol) = -1 } - for (k <- splitBytesOfLocationsAndGroup.indices) { - splitBytesOfLocationsAndGroup(maxRow)(k) = -1 + for (col <- splitBytesOfLocationsAndGroup.indices) { + splitBytesOfLocationsAndGroup(maxRow)(col) = -1 } } Some(preferredLocationsOfReduces)