Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, 0);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -167,8 +167,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, writeMetrics.recordsWritten());
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ void closeAndWriteOutput() throws IOException {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, writeMetrics.recordsWritten());
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ package org.apache.spark
* @param shuffleId ID of the shuffle
* @param bytesByPartitionId approximate number of output bytes for each map output partition
* (may be inexact due to use of compressed map statuses)
* @param recordsByPartitionId number of output records for each map output partition
*/
private[spark] class MapOutputStatistics(
val shuffleId: Int,
val bytesByPartitionId: Array[Long],
val recordsByPartitionId: Array[Long])
private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])

9 changes: 2 additions & 7 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -522,19 +522,17 @@ private[spark] class MapOutputTrackerMaster(
def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
val totalSizes = new Array[Long](dep.partitioner.numPartitions)
val recordsByMapTask = new Array[Long](statuses.length)

val parallelAggThreshold = conf.get(
SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
val parallelism = math.min(
Runtime.getRuntime.availableProcessors(),
statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1).toInt
if (parallelism <= 1) {
statuses.zipWithIndex.foreach { case (s, index) =>
for (s <- statuses) {
for (i <- 0 until totalSizes.length) {
totalSizes(i) += s.getSizeForBlock(i)
}
recordsByMapTask(index) = s.numberOfOutput
}
} else {
val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
Expand All @@ -551,11 +549,8 @@ private[spark] class MapOutputTrackerMaster(
} finally {
threadPool.shutdown()
}
statuses.zipWithIndex.foreach { case (s, index) =>
recordsByMapTask(index) = s.numberOfOutput
}
}
new MapOutputStatistics(dep.shuffleId, totalSizes, recordsByMapTask)
new MapOutputStatistics(dep.shuffleId, totalSizes)
}
}

Expand Down
43 changes: 12 additions & 31 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import org.apache.spark.util.Utils

/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
* task ran on, the sizes of outputs for each reducer, and the number of outputs of the map task,
* for passing on to the reduce tasks.
* task ran on, the sizes of outputs for each reducer, for passing on to the reduce tasks.
*/
private[spark] sealed trait MapStatus {
/** Location where this task was run. */
Expand All @@ -45,23 +44,18 @@ private[spark] sealed trait MapStatus {
* necessary for correctness, since block fetchers are allowed to skip zero-size blocks.
*/
def getSizeForBlock(reduceId: Int): Long

/**
* The number of outputs for the map task.
*/
def numberOfOutput: Long
}


private[spark] object MapStatus {

def apply(loc: BlockManagerId, uncompressedSizes: Array[Long], numOutput: Long): MapStatus = {
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > Option(SparkEnv.get)
.map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)) {
HighlyCompressedMapStatus(loc, uncompressedSizes, numOutput)
HighlyCompressedMapStatus(loc, uncompressedSizes)
} else {
new CompressedMapStatus(loc, uncompressedSizes, numOutput)
new CompressedMapStatus(loc, uncompressedSizes)
}
}

Expand Down Expand Up @@ -104,34 +98,29 @@ private[spark] object MapStatus {
*/
private[spark] class CompressedMapStatus(
private[this] var loc: BlockManagerId,
private[this] var compressedSizes: Array[Byte],
private[this] var numOutput: Long)
private[this] var compressedSizes: Array[Byte])
extends MapStatus with Externalizable {

protected def this() = this(null, null.asInstanceOf[Array[Byte]], -1) // For deserialization only
protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only

def this(loc: BlockManagerId, uncompressedSizes: Array[Long], numOutput: Long) {
this(loc, uncompressedSizes.map(MapStatus.compressSize), numOutput)
def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
this(loc, uncompressedSizes.map(MapStatus.compressSize))
}

override def location: BlockManagerId = loc

override def numberOfOutput: Long = numOutput

override def getSizeForBlock(reduceId: Int): Long = {
MapStatus.decompressSize(compressedSizes(reduceId))
}

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
out.writeLong(numOutput)
out.writeInt(compressedSizes.length)
out.write(compressedSizes)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
numOutput = in.readLong()
val len = in.readInt()
compressedSizes = new Array[Byte](len)
in.readFully(compressedSizes)
Expand All @@ -154,20 +143,17 @@ private[spark] class HighlyCompressedMapStatus private (
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long,
private var hugeBlockSizes: Map[Int, Byte],
private[this] var numOutput: Long)
private var hugeBlockSizes: Map[Int, Byte])
extends MapStatus with Externalizable {

// loc could be null when the default constructor is called during deserialization
require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 || numNonEmptyBlocks == 0,
"Average size can only be zero for map stages that produced no output")

protected def this() = this(null, -1, null, -1, null, -1) // For deserialization only
protected def this() = this(null, -1, null, -1, null) // For deserialization only

override def location: BlockManagerId = loc

override def numberOfOutput: Long = numOutput

override def getSizeForBlock(reduceId: Int): Long = {
assert(hugeBlockSizes != null)
if (emptyBlocks.contains(reduceId)) {
Expand All @@ -182,7 +168,6 @@ private[spark] class HighlyCompressedMapStatus private (

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
out.writeLong(numOutput)
emptyBlocks.writeExternal(out)
out.writeLong(avgSize)
out.writeInt(hugeBlockSizes.size)
Expand All @@ -194,7 +179,6 @@ private[spark] class HighlyCompressedMapStatus private (

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
numOutput = in.readLong()
emptyBlocks = new RoaringBitmap()
emptyBlocks.readExternal(in)
avgSize = in.readLong()
Expand All @@ -210,10 +194,7 @@ private[spark] class HighlyCompressedMapStatus private (
}

private[spark] object HighlyCompressedMapStatus {
def apply(
loc: BlockManagerId,
uncompressedSizes: Array[Long],
numOutput: Long): HighlyCompressedMapStatus = {
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = {
// We must keep track of which blocks are empty so that we don't report a zero-sized
// block as being non-empty (or vice-versa) when using the average block size.
var i = 0
Expand Down Expand Up @@ -254,6 +235,6 @@ private[spark] object HighlyCompressedMapStatus {
emptyBlocks.trim()
emptyBlocks.runOptimize()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize,
hugeBlockSizesArray.toMap, numOutput)
hugeBlockSizesArray.toMap)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ private[spark] class SortShuffleWriter[K, V, C](
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths,
writeMetrics.recordsWritten)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ public void writeEmptyIterator() throws Exception {
writer.write(Iterators.emptyIterator());
final Option<MapStatus> mapStatus = writer.stop(true);
assertTrue(mapStatus.isDefined());
assertEquals(0, mapStatus.get().numberOfOutput());
assertTrue(mergedOutputFile.exists());
assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
assertEquals(0, taskMetrics.shuffleWriteMetrics().recordsWritten());
Expand All @@ -253,7 +252,6 @@ public void writeWithoutSpilling() throws Exception {
writer.write(dataToWrite.iterator());
final Option<MapStatus> mapStatus = writer.stop(true);
assertTrue(mapStatus.isDefined());
assertEquals(NUM_PARTITITONS, mapStatus.get().numberOfOutput());
assertTrue(mergedOutputFile.exists());

long sumOfPartitionSizes = 0;
Expand Down
28 changes: 14 additions & 14 deletions core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L))
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(1000L, 10000L), 10))
Array(1000L, 10000L)))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(10000L, 1000L), 10))
Array(10000L, 1000L)))
val statuses = tracker.getMapSizesByExecutorId(10, 0)
assert(statuses.toSet ===
Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))),
Expand All @@ -84,9 +84,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val compressedSize1000 = MapStatus.compressSize(1000L)
val compressedSize10000 = MapStatus.compressSize(10000L)
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(compressedSize1000, compressedSize10000), 10))
Array(compressedSize1000, compressedSize10000)))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(compressedSize10000, compressedSize1000), 10))
Array(compressedSize10000, compressedSize1000)))
assert(tracker.containsShuffle(10))
assert(tracker.getMapSizesByExecutorId(10, 0).nonEmpty)
assert(0 == tracker.getNumCachedSerializedBroadcast)
Expand All @@ -107,9 +107,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val compressedSize1000 = MapStatus.compressSize(1000L)
val compressedSize10000 = MapStatus.compressSize(10000L)
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(compressedSize1000, compressedSize1000, compressedSize1000), 10))
Array(compressedSize1000, compressedSize1000, compressedSize1000)))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(compressedSize10000, compressedSize1000, compressedSize1000), 10))
Array(compressedSize10000, compressedSize1000, compressedSize1000)))

assert(0 == tracker.getNumCachedSerializedBroadcast)
// As if we had two simultaneous fetch failures
Expand Down Expand Up @@ -145,7 +145,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {

val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
masterTracker.registerMapOutput(10, 0, MapStatus(
BlockManagerId("a", "hostA", 1000), Array(1000L), 10))
BlockManagerId("a", "hostA", 1000), Array(1000L)))
slaveTracker.updateEpoch(masterTracker.getEpoch)
assert(slaveTracker.getMapSizesByExecutorId(10, 0).toSeq ===
Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000)))))
Expand Down Expand Up @@ -182,7 +182,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
// Message size should be ~123B, and no exception should be thrown
masterTracker.registerShuffle(10, 1)
masterTracker.registerMapOutput(10, 0, MapStatus(
BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0), 0))
BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
val senderAddress = RpcAddress("localhost", 12345)
val rpcCallContext = mock(classOf[RpcCallContext])
when(rpcCallContext.senderAddress).thenReturn(senderAddress)
Expand Down Expand Up @@ -216,11 +216,11 @@ class MapOutputTrackerSuite extends SparkFunSuite {
// on hostB with output size 3
tracker.registerShuffle(10, 3)
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(2L), 1))
Array(2L)))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(2L), 1))
Array(2L)))
tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(3L), 1))
Array(3L)))

// When the threshold is 50%, only host A should be returned as a preferred location
// as it has 4 out of 7 bytes of output.
Expand Down Expand Up @@ -260,7 +260,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
masterTracker.registerShuffle(20, 100)
(0 until 100).foreach { i =>
masterTracker.registerMapOutput(20, i, new CompressedMapStatus(
BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0), 0))
BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0)))
}
val senderAddress = RpcAddress("localhost", 12345)
val rpcCallContext = mock(classOf[RpcCallContext])
Expand Down Expand Up @@ -309,9 +309,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L))
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(size0, size1000, size0, size10000), 1))
Array(size0, size1000, size0, size10000)))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(size10000, size0, size1000, size0), 1))
Array(size10000, size0, size1000, size0)))
assert(tracker.containsShuffle(10))
assert(tracker.getMapSizesByExecutorId(10, 0, 4).toSeq ===
Seq(
Expand Down
1 change: 0 additions & 1 deletion core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
assert(mapOutput2.isDefined)
assert(mapOutput1.get.location === mapOutput2.get.location)
assert(mapOutput1.get.getSizeForBlock(0) === mapOutput1.get.getSizeForBlock(0))
assert(mapOutput1.get.numberOfOutput === mapOutput2.get.numberOfOutput)

// register one of the map outputs -- doesn't matter which one
mapOutput1.foreach { case mapStatus =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,17 +445,17 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// map stage1 completes successfully, with one task on each executor
complete(taskSets(0), Seq(
(Success,
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), 1)),
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))),
(Success,
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), 1)),
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))),
(Success, makeMapStatus("hostB", 1))
))
// map stage2 completes successfully, with one task on each executor
complete(taskSets(1), Seq(
(Success,
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), 1)),
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))),
(Success,
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), 1)),
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))),
(Success, makeMapStatus("hostB", 1))
))
// make sure our test setup is correct
Expand Down Expand Up @@ -2857,7 +2857,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi

object DAGSchedulerSuite {
def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus =
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes), 1)
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes))

def makeBlockManagerId(host: String): BlockManagerId =
BlockManagerId("exec-" + host, host, 12345)
Expand Down
Loading