Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
636 changes: 360 additions & 276 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,14 @@ private[spark] class Executor(
throw new TaskKilledException(killReason.get)
}

logDebug("Task " + taskId + "'s epoch is " + task.epoch)
env.mapOutputTracker.updateEpoch(task.epoch)
// The purpose of updating the epoch here is to invalidate executor map output status cache
// in case FetchFailures have occurred. In local mode `env.mapOutputTracker` will be
// MapOutputTrackerMaster and its cache invalidation is not based on epoch numbers so
// we don't need to make any special calls here.
if (!isLocal) {
logDebug("Task " + taskId + "'s epoch is " + task.epoch)
env.mapOutputTracker.asInstanceOf[MapOutputTrackerWorker].updateEpoch(task.epoch)
}

// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
Expand Down
51 changes: 14 additions & 37 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -328,25 +328,14 @@ class DAGScheduler(
val numTasks = rdd.partitions.length
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
val stage = new ShuffleMapStage(
id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)

stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)

if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is no longer necessary because ShuffleMapTask queries the MapOutputTracker.

// A previously run stage generated partitions for this shuffle, so for each output
// that's still available, copy information about that output location to the new stage
// (so we don't unnecessarily re-compute that data).
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
(0 until locs.length).foreach { i =>
if (locs(i) ne null) {
// locs(i) will be null if missing
stage.addOutputLoc(i, locs(i))
}
}
} else {
if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
Expand Down Expand Up @@ -1217,7 +1206,8 @@ class DAGScheduler(
// The epoch of the task is acceptable (i.e., the task was launched after the most
// recent failure we're aware of for the executor), so mark the task's output as
// available.
shuffleStage.addOutputLoc(smt.partitionId, status)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the old code we'd incrementally register map outputs in ShuffleStage and then would write the entire set of complete map outputs into MapOutputTracker in one shot upon stage completion (see line 1242 in the old code). Now we write this incrementally.

mapOutputTracker.registerMapOutput(
shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
// Remove the task's partition from pending partitions. This may have already been
// done above, but will not have been done yet in cases where the task attempt was
// from an earlier attempt of the stage (i.e., not the attempt that's currently
Expand All @@ -1234,16 +1224,14 @@ class DAGScheduler(
logInfo("waiting: " + waitingStages)
logInfo("failed: " + failedStages)

// We supply true to increment the epoch number here in case this is a
// recomputation of the map outputs. In that case, some nodes may have cached
// locations with holes (from when we detected the error) and will need the
// epoch incremented to refetch them.
// TODO: Only increment the epoch number if this is not the first time
// we registered these map outputs.
mapOutputTracker.registerMapOutputs(
shuffleStage.shuffleDep.shuffleId,
shuffleStage.outputLocInMapOutputTrackerFormat(),
changeEpoch = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safer if we increment the epoch number here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to think about this carefully and maybe make a matrix of possible cases to be sure. My original thought process was something like this:

  • The old code comment says TODO: Only increment the epoch number if this is not the first time we registered these map outputs, which implies that at least some of the epoch increments here were unnecessary.
  • If we assume that a new, never-before-computed map output won't be requested by executors before it is complete then we don't need to worry about executors caching incomplete map outputs.
  • I believe that any FetchFailure should end up incrementing the epoch.

That said, the increment here is only occurring once per stage completion. It probably doesn't hurt to bump the epoch here because in a single-stage-at-a-time case we'd only be invalidating map outputs which we'll never fetch again anyways. Even if we were unnecessarily invalidating the map output statuses of other concurrent stages I think that the impact of this is going to be relatively small (if we did find that this had an impact then a sane approach would be to implement an e-tag like mechanism where bumping the epoch doesn't purge the executor-side caches, but, instead, has them verify a per-stage epoch / counter). Finally, the existing code might be giving us nice eager cleanup of map statuses after stages complete (vs. the cleanup which occurs later when stages or shuffles are fully cleaned up).

I think you're right that this change carries unnecessary / not-fully-understood risks for now, so let me go ahead and put in an explicit increment here (with an updated comment / ref. to this discussion) in my next push to this PR.

// This call to increment the epoch may not be strictly necessary, but it is retained
// for now in order to minimize the changes in behavior from an earlier version of the
// code. This existing behavior of always incrementing the epoch following any
// successful shuffle map stage completion may have benefits by causing unneeded
// cached map outputs to be cleaned up earlier on executors. In the future we can
// consider removing this call, but this will require some extra investigation.
// See https://github.com/apache/spark/pull/17955/files#r117385673 for more details.
mapOutputTracker.incrementEpoch()

clearCacheLocs()

Expand Down Expand Up @@ -1343,7 +1331,6 @@ class DAGScheduler(
}
// Mark the map whose fetch failed as broken in the map stage
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
}

Expand Down Expand Up @@ -1393,17 +1380,7 @@ class DAGScheduler(

if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleIdToMapStage) {
stage.removeOutputsOnExecutor(execId)
mapOutputTracker.registerMapOutputs(
shuffleId,
stage.outputLocInMapOutputTrackerFormat(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a potential inefficiency in the old code. If you lost a single executor then we'd end up overwriting the MapOutputTracker's state for every ShuffleMapStage. We'd have to iterate through each task once in stage.removeOutputsOnExecutor and then a second time in stage.outputLocInMapOutputTrackerFormat() and in both iterations we'd have to iterate on a List for each task too.

While the new code still needs to scan every task it doesn't have to scan a separate list per task and only needs a single scan, not two.

changeEpoch = true)
}
if (shuffleIdToMapStage.isEmpty) {
mapOutputTracker.incrementEpoch()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea behind this branch was to ensure that we always increment the mapOutputTracker epoch upon executor lost. Thus I put an incrementEpoch() into the removeOutputsOnExecutor() call itself inside MapOutputTrackerMaster.

}
mapOutputTracker.removeOutputsOnExecutor(execId)
clearCacheLocs()
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package org.apache.spark.scheduler

import scala.collection.mutable.HashSet

import org.apache.spark.ShuffleDependency
import org.apache.spark.{MapOutputTrackerMaster, ShuffleDependency, SparkEnv}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.CallSite

/**
Expand All @@ -42,13 +41,12 @@ private[spark] class ShuffleMapStage(
parents: List[Stage],
firstJobId: Int,
callSite: CallSite,
val shuffleDep: ShuffleDependency[_, _, _])
val shuffleDep: ShuffleDependency[_, _, _],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we can pass the shuffleId, instead of ShuffleDependency here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I agree, but with the caveat that we can only clean this up if this isn't functioning as the last strong reference which keeps the dependency from being garbage-collected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually going to leave this as-is for now since I think the reference might actually be serving a purpose and I want to minimize scope of changes for now.

mapOutputTrackerMaster: MapOutputTrackerMaster)
extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {

private[this] var _mapStageJobs: List[ActiveJob] = Nil

private[this] var _numAvailableOutputs: Int = 0

/**
* Partitions that either haven't yet been computed, or that were computed on an executor
* that has since been lost, so should be re-computed. This variable is used by the
Expand All @@ -60,13 +58,6 @@ private[spark] class ShuffleMapStage(
*/
val pendingPartitions = new HashSet[Int]

/**
* List of [[MapStatus]] for each partition. The index of the array is the map partition id,
* and each value in the array is the list of possible [[MapStatus]] for a partition
* (a single task might run multiple times).
*/
private[this] val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)

override def toString: String = "ShuffleMapStage " + id

/**
Expand All @@ -88,69 +79,18 @@ private[spark] class ShuffleMapStage(
/**
* Number of partitions that have shuffle outputs.
* When this reaches [[numPartitions]], this map stage is ready.
* This should be kept consistent as `outputLocs.filter(!_.isEmpty).size`.
*/
def numAvailableOutputs: Int = _numAvailableOutputs
def numAvailableOutputs: Int = mapOutputTrackerMaster.getNumAvailableOutputs(shuffleDep.shuffleId)

/**
* Returns true if the map stage is ready, i.e. all partitions have shuffle outputs.
* This should be the same as `outputLocs.contains(Nil)`.
*/
def isAvailable: Boolean = _numAvailableOutputs == numPartitions
def isAvailable: Boolean = numAvailableOutputs == numPartitions

/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
override def findMissingPartitions(): Seq[Int] = {
val missing = (0 until numPartitions).filter(id => outputLocs(id).isEmpty)
assert(missing.size == numPartitions - _numAvailableOutputs,
s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}")
missing
}

def addOutputLoc(partition: Int, status: MapStatus): Unit = {
val prevList = outputLocs(partition)
outputLocs(partition) = status :: prevList
if (prevList == Nil) {
_numAvailableOutputs += 1
}
}

def removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit = {
val prevList = outputLocs(partition)
val newList = prevList.filterNot(_.location == bmAddress)
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil) {
_numAvailableOutputs -= 1
}
}

/**
* Returns an array of [[MapStatus]] (index by partition id). For each partition, the returned
* value contains only one (i.e. the first) [[MapStatus]]. If there is no entry for the partition,
* that position is filled with null.
*/
def outputLocInMapOutputTrackerFormat(): Array[MapStatus] = {
outputLocs.map(_.headOption.orNull)
}

/**
* Removes all shuffle outputs associated with this executor. Note that this will also remove
* outputs which are served by an external shuffle server (if one exists), as they are still
* registered with this execId.
*/
def removeOutputsOnExecutor(execId: String): Unit = {
var becameUnavailable = false
for (partition <- 0 until numPartitions) {
val prevList = outputLocs(partition)
val newList = prevList.filterNot(_.location.executorId == execId)
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil) {
becameUnavailable = true
_numAvailableOutputs -= 1
}
}
if (becameUnavailable) {
logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
this, execId, _numAvailableOutputs, numPartitions, isAvailable))
}
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private[spark] class TaskSchedulerImpl private[scheduler](

var backend: SchedulerBackend = null

val mapOutputTracker = SparkEnv.get.mapOutputTracker
val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]

private var schedulableBuilder: SchedulableBuilder = null
// default scheduler is FIFO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,21 @@ class MapOutputTrackerSuite extends SparkFunSuite {
slaveRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)

masterTracker.registerShuffle(10, 1)
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
// This is expected to fail because no outputs have been registered for the shuffle.
intercept[FetchFailedException] { slaveTracker.getMapSizesByExecutorId(10, 0) }

val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
masterTracker.registerMapOutput(10, 0, MapStatus(
BlockManagerId("a", "hostA", 1000), Array(1000L)))
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
assert(slaveTracker.getMapSizesByExecutorId(10, 0) ===
Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000)))))
assert(0 == masterTracker.getNumCachedSerializedBroadcast)

val masterTrackerEpochBeforeLossOfMapOutput = masterTracker.getEpoch
masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
masterTracker.incrementEpoch()
assert(masterTracker.getEpoch > masterTrackerEpochBeforeLossOfMapOutput)
slaveTracker.updateEpoch(masterTracker.getEpoch)
intercept[FetchFailedException] { slaveTracker.getMapSizesByExecutorId(10, 0) }

Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
val shuffleMapRdd = new MyRDD(sc, 1, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
val shuffleHandle = manager.registerShuffle(0, 1, shuffleDep)
mapTrackerMaster.registerShuffle(0, 1)

// first attempt -- its successful
val writer1 = manager.getWriter[Int, Int](shuffleHandle, 0,
Expand Down Expand Up @@ -393,7 +394,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC

// register one of the map outputs -- doesn't matter which one
mapOutput1.foreach { case mapStatus =>
mapTrackerMaster.registerMapOutputs(0, Array(mapStatus))
mapTrackerMaster.registerMapOutput(0, 0, mapStatus)
}

val reader = manager.getReader[Int, Int](shuffleHandle, 0, 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
sc = new SparkContext(conf)
val scheduler = mock[TaskSchedulerImpl]
when(scheduler.sc).thenReturn(sc)
when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker)
when(scheduler.mapOutputTracker).thenReturn(
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster])
scheduler
}

Expand Down