Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4e31bb7
Use map output statistices to improve global limit's parallelism.
viirya Jan 23, 2017
b049cc4
Merge remote-tracking branch 'upstream/master' into improve-global-li…
viirya Feb 4, 2017
45a1fcb
Use Long for number of outputs. Turn to the approach of calculating n…
viirya Feb 27, 2017
e9679ba
Merge remote-tracking branch 'upstream/master' into improve-global-li…
viirya Mar 1, 2017
1a56252
Rebased with latest change.
viirya Mar 1, 2017
df44243
Merge remote-tracking branch 'upstream/master' into improve-global-li…
viirya Mar 7, 2017
2d37598
Changed Limit outputs different results. It affects the test case out…
viirya Mar 7, 2017
b8a2275
Merge remote-tracking branch 'upstream/master' into improve-global-li…
viirya Apr 11, 2017
867a93d
Address comments.
viirya May 11, 2017
55ee6b0
Merge remote-tracking branch 'upstream/master' into improve-global-li…
viirya May 18, 2017
8f779ac
Merge remote-tracking branch 'upstream/master' into improve-global-li…
viirya Jun 21, 2017
f2a7aac
Merge remote-tracking branch 'upstream/master' into improve-global-li…
viirya Sep 11, 2017
7598337
Merge remote-tracking branch 'upstream/master' into improve-global-li…
viirya Oct 31, 2017
e53648e
ShuffleExchange becomes ShuffleExchangeExec now.
viirya Oct 31, 2017
062b8fd
Merge remote-tracking branch 'upstream/master' into improve-global-li…
viirya May 7, 2018
47f6031
Merge remote-tracking branch 'upstream/master' into improve-global-li…
viirya May 7, 2018
a691e88
Fix merging conflict.
viirya May 7, 2018
5594bf9
Avoid evenly scanning partitions when child output has ordering.
viirya May 10, 2018
c9c8be6
Some refactoring.
viirya May 10, 2018
ca00701
Disable global limit optimization in limit sql query test.
viirya Jun 22, 2018
21b6948
Use array instead of map.
viirya Jun 22, 2018
59a3029
Merge remote-tracking branch 'upstream/master' into improve-global-li…
viirya Jun 22, 2018
4b443cc
Merge remote-tracking branch 'upstream/master' into improve-global-li…
viirya Jun 22, 2018
1ff1fa5
Resolve merging issue.
viirya Jun 22, 2018
a737573
Address comment.
viirya Jun 22, 2018
b0cca1a
Merge remote-tracking branch 'upstream/master' into improve-global-li…
viirya Jun 22, 2018
f24171e
Address comment.
viirya Jun 23, 2018
2d522b4
Use orgPartition.numPartitions.
viirya Jun 26, 2018
9792220
Use childRDD.
viirya Jun 26, 2018
19d7d75
Use writeMetrics.recordsWritten.
viirya Jun 28, 2018
d05c144
Revert unused change.
viirya Jul 24, 2018
69513d1
Merge remote-tracking branch 'upstream/master' into improve-global-li…
viirya Aug 4, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, 0);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -167,7 +167,8 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, writeMetrics.recordsWritten());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ void closeAndWriteOutput() throws IOException {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, writeMetrics.recordsWritten());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,9 @@ package org.apache.spark
* @param shuffleId ID of the shuffle
* @param bytesByPartitionId approximate number of output bytes for each map output partition
* (may be inexact due to use of compressed map statuses)
* @param recordsByPartitionId number of output records for each map output partition
*/
private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])
private[spark] class MapOutputStatistics(
val shuffleId: Int,
val bytesByPartitionId: Array[Long],
val recordsByPartitionId: Array[Long])
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -522,16 +522,19 @@ private[spark] class MapOutputTrackerMaster(
def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
val totalSizes = new Array[Long](dep.partitioner.numPartitions)
val recordsByMapTask = new Array[Long](statuses.length)

val parallelAggThreshold = conf.get(
SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
val parallelism = math.min(
Runtime.getRuntime.availableProcessors(),
statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1).toInt
if (parallelism <= 1) {
for (s <- statuses) {
statuses.zipWithIndex.foreach { case (s, index) =>
for (i <- 0 until totalSizes.length) {
totalSizes(i) += s.getSizeForBlock(i)
}
recordsByMapTask(index) = s.numberOfOutput
}
} else {
val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
Expand All @@ -548,8 +551,11 @@ private[spark] class MapOutputTrackerMaster(
} finally {
threadPool.shutdown()
}
statuses.zipWithIndex.foreach { case (s, index) =>
recordsByMapTask(index) = s.numberOfOutput
}
}
new MapOutputStatistics(dep.shuffleId, totalSizes)
new MapOutputStatistics(dep.shuffleId, totalSizes, recordsByMapTask)
}
}

Expand Down
43 changes: 31 additions & 12 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import org.apache.spark.util.Utils

/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
* task ran on, the sizes of outputs for each reducer, and the number of outputs of the map task,
* for passing on to the reduce tasks.
*/
private[spark] sealed trait MapStatus {
/** Location where this task was run. */
Expand All @@ -44,18 +45,23 @@ private[spark] sealed trait MapStatus {
* necessary for correctness, since block fetchers are allowed to skip zero-size blocks.
*/
def getSizeForBlock(reduceId: Int): Long

/**
* The number of outputs for the map task.
*/
def numberOfOutput: Long
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this mean? output blocks? output files?

}


private[spark] object MapStatus {

def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long], numOutput: Long): MapStatus = {
if (uncompressedSizes.length > Option(SparkEnv.get)
.map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)) {
HighlyCompressedMapStatus(loc, uncompressedSizes)
HighlyCompressedMapStatus(loc, uncompressedSizes, numOutput)
} else {
new CompressedMapStatus(loc, uncompressedSizes)
new CompressedMapStatus(loc, uncompressedSizes, numOutput)
}
}

Expand Down Expand Up @@ -98,29 +104,34 @@ private[spark] object MapStatus {
*/
private[spark] class CompressedMapStatus(
private[this] var loc: BlockManagerId,
private[this] var compressedSizes: Array[Byte])
private[this] var compressedSizes: Array[Byte],
private[this] var numOutput: Long)
extends MapStatus with Externalizable {

protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only
protected def this() = this(null, null.asInstanceOf[Array[Byte]], -1) // For deserialization only

def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
this(loc, uncompressedSizes.map(MapStatus.compressSize))
def this(loc: BlockManagerId, uncompressedSizes: Array[Long], numOutput: Long) {
this(loc, uncompressedSizes.map(MapStatus.compressSize), numOutput)
}

override def location: BlockManagerId = loc

override def numberOfOutput: Long = numOutput

override def getSizeForBlock(reduceId: Int): Long = {
MapStatus.decompressSize(compressedSizes(reduceId))
}

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
out.writeLong(numOutput)
out.writeInt(compressedSizes.length)
out.write(compressedSizes)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
numOutput = in.readLong()
val len = in.readInt()
compressedSizes = new Array[Byte](len)
in.readFully(compressedSizes)
Expand All @@ -143,17 +154,20 @@ private[spark] class HighlyCompressedMapStatus private (
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long,
private var hugeBlockSizes: Map[Int, Byte])
private var hugeBlockSizes: Map[Int, Byte],
private[this] var numOutput: Long)
extends MapStatus with Externalizable {

// loc could be null when the default constructor is called during deserialization
require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 || numNonEmptyBlocks == 0,
"Average size can only be zero for map stages that produced no output")

protected def this() = this(null, -1, null, -1, null) // For deserialization only
protected def this() = this(null, -1, null, -1, null, -1) // For deserialization only

override def location: BlockManagerId = loc

override def numberOfOutput: Long = numOutput

override def getSizeForBlock(reduceId: Int): Long = {
assert(hugeBlockSizes != null)
if (emptyBlocks.contains(reduceId)) {
Expand All @@ -168,6 +182,7 @@ private[spark] class HighlyCompressedMapStatus private (

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
out.writeLong(numOutput)
emptyBlocks.writeExternal(out)
out.writeLong(avgSize)
out.writeInt(hugeBlockSizes.size)
Expand All @@ -179,6 +194,7 @@ private[spark] class HighlyCompressedMapStatus private (

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
numOutput = in.readLong()
emptyBlocks = new RoaringBitmap()
emptyBlocks.readExternal(in)
avgSize = in.readLong()
Expand All @@ -194,7 +210,10 @@ private[spark] class HighlyCompressedMapStatus private (
}

private[spark] object HighlyCompressedMapStatus {
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = {
def apply(
loc: BlockManagerId,
uncompressedSizes: Array[Long],
numOutput: Long): HighlyCompressedMapStatus = {
// We must keep track of which blocks are empty so that we don't report a zero-sized
// block as being non-empty (or vice-versa) when using the average block size.
var i = 0
Expand Down Expand Up @@ -235,6 +254,6 @@ private[spark] object HighlyCompressedMapStatus {
emptyBlocks.trim()
emptyBlocks.runOptimize()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize,
hugeBlockSizesArray.toMap)
hugeBlockSizesArray.toMap, numOutput)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ private[spark] class SortShuffleWriter[K, V, C](
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths,
writeMetrics.recordsWritten)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ public void writeEmptyIterator() throws Exception {
writer.write(Iterators.emptyIterator());
final Option<MapStatus> mapStatus = writer.stop(true);
assertTrue(mapStatus.isDefined());
assertEquals(0, mapStatus.get().numberOfOutput());
assertTrue(mergedOutputFile.exists());
assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
assertEquals(0, taskMetrics.shuffleWriteMetrics().recordsWritten());
Expand All @@ -252,6 +253,7 @@ public void writeWithoutSpilling() throws Exception {
writer.write(dataToWrite.iterator());
final Option<MapStatus> mapStatus = writer.stop(true);
assertTrue(mapStatus.isDefined());
assertEquals(NUM_PARTITITONS, mapStatus.get().numberOfOutput());
assertTrue(mergedOutputFile.exists());

long sumOfPartitionSizes = 0;
Expand Down
28 changes: 14 additions & 14 deletions core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L))
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(1000L, 10000L)))
Array(1000L, 10000L), 10))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(10000L, 1000L)))
Array(10000L, 1000L), 10))
val statuses = tracker.getMapSizesByExecutorId(10, 0)
assert(statuses.toSet ===
Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))),
Expand All @@ -84,9 +84,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val compressedSize1000 = MapStatus.compressSize(1000L)
val compressedSize10000 = MapStatus.compressSize(10000L)
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(compressedSize1000, compressedSize10000)))
Array(compressedSize1000, compressedSize10000), 10))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(compressedSize10000, compressedSize1000)))
Array(compressedSize10000, compressedSize1000), 10))
assert(tracker.containsShuffle(10))
assert(tracker.getMapSizesByExecutorId(10, 0).nonEmpty)
assert(0 == tracker.getNumCachedSerializedBroadcast)
Expand All @@ -107,9 +107,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val compressedSize1000 = MapStatus.compressSize(1000L)
val compressedSize10000 = MapStatus.compressSize(10000L)
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(compressedSize1000, compressedSize1000, compressedSize1000)))
Array(compressedSize1000, compressedSize1000, compressedSize1000), 10))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(compressedSize10000, compressedSize1000, compressedSize1000)))
Array(compressedSize10000, compressedSize1000, compressedSize1000), 10))

assert(0 == tracker.getNumCachedSerializedBroadcast)
// As if we had two simultaneous fetch failures
Expand Down Expand Up @@ -145,7 +145,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {

val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
masterTracker.registerMapOutput(10, 0, MapStatus(
BlockManagerId("a", "hostA", 1000), Array(1000L)))
BlockManagerId("a", "hostA", 1000), Array(1000L), 10))
slaveTracker.updateEpoch(masterTracker.getEpoch)
assert(slaveTracker.getMapSizesByExecutorId(10, 0).toSeq ===
Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000)))))
Expand Down Expand Up @@ -182,7 +182,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
// Message size should be ~123B, and no exception should be thrown
masterTracker.registerShuffle(10, 1)
masterTracker.registerMapOutput(10, 0, MapStatus(
BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0), 0))
val senderAddress = RpcAddress("localhost", 12345)
val rpcCallContext = mock(classOf[RpcCallContext])
when(rpcCallContext.senderAddress).thenReturn(senderAddress)
Expand Down Expand Up @@ -216,11 +216,11 @@ class MapOutputTrackerSuite extends SparkFunSuite {
// on hostB with output size 3
tracker.registerShuffle(10, 3)
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(2L)))
Array(2L), 1))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(2L)))
Array(2L), 1))
tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(3L)))
Array(3L), 1))

// When the threshold is 50%, only host A should be returned as a preferred location
// as it has 4 out of 7 bytes of output.
Expand Down Expand Up @@ -260,7 +260,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
masterTracker.registerShuffle(20, 100)
(0 until 100).foreach { i =>
masterTracker.registerMapOutput(20, i, new CompressedMapStatus(
BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0)))
BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0), 0))
}
val senderAddress = RpcAddress("localhost", 12345)
val rpcCallContext = mock(classOf[RpcCallContext])
Expand Down Expand Up @@ -309,9 +309,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L))
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(size0, size1000, size0, size10000)))
Array(size0, size1000, size0, size10000), 1))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(size10000, size0, size1000, size0)))
Array(size10000, size0, size1000, size0), 1))
assert(tracker.containsShuffle(10))
assert(tracker.getMapSizesByExecutorId(10, 0, 4).toSeq ===
Seq(
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
assert(mapOutput2.isDefined)
assert(mapOutput1.get.location === mapOutput2.get.location)
assert(mapOutput1.get.getSizeForBlock(0) === mapOutput1.get.getSizeForBlock(0))
assert(mapOutput1.get.numberOfOutput === mapOutput2.get.numberOfOutput)

// register one of the map outputs -- doesn't matter which one
mapOutput1.foreach { case mapStatus =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,17 +423,17 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// map stage1 completes successfully, with one task on each executor
complete(taskSets(0), Seq(
(Success,
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))),
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), 1)),
(Success,
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))),
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), 1)),
(Success, makeMapStatus("hostB", 1))
))
// map stage2 completes successfully, with one task on each executor
complete(taskSets(1), Seq(
(Success,
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))),
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), 1)),
(Success,
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))),
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), 1)),
(Success, makeMapStatus("hostB", 1))
))
// make sure our test setup is correct
Expand Down Expand Up @@ -2579,7 +2579,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi

object DAGSchedulerSuite {
def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus =
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes))
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes), 1)

def makeBlockManagerId(host: String): BlockManagerId =
BlockManagerId("exec-" + host, host, 12345)
Expand Down
Loading