Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
636 changes: 360 additions & 276 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import org.apache.spark.util.random.SamplingUtils
/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
*
* Note that, partitioner must be deterministic, i.e. it must return the same partition id given
* the same partition key.
*/
abstract class Partitioner extends Serializable {
def numPartitions: Int
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,14 @@ private[spark] class Executor(
throw new TaskKilledException(killReason.get)
}

logDebug("Task " + taskId + "'s epoch is " + task.epoch)
env.mapOutputTracker.updateEpoch(task.epoch)
// The purpose of updating the epoch here is to invalidate executor map output status cache
// in case FetchFailures have occurred. In local mode `env.mapOutputTracker` will be
// MapOutputTrackerMaster and its cache invalidation is not based on epoch numbers so
// we don't need to make any special calls here.
if (!isLocal) {
logDebug("Task " + taskId + "'s epoch is " + task.epoch)
env.mapOutputTracker.asInstanceOf[MapOutputTrackerWorker].updateEpoch(task.epoch)
}

// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
Expand Down
21 changes: 20 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,22 @@ import org.apache.spark.{Partition, TaskContext}

/**
* An RDD that applies the provided function to every partition of the parent RDD.
*
* @param prev the parent RDD.
* @param f The function used to map a tuple of (TaskContext, partition index, input iterator) to
* an output iterator.
* @param preservesPartitioning Whether the input function preserves the partitioner, which should
* be `false` unless `prev` is a pair RDD and the input function
* doesn't modify the keys.
* @param isOrderSensitive whether or not the function is order-sensitive. If it's order
* sensitive, it may return totally different result when the input order
* is changed. Mostly stateful functions are order-sensitive.
*/
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
preservesPartitioning: Boolean = false,
isOrderSensitive: Boolean = false)
extends RDD[U](prev) {

override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
Expand All @@ -41,4 +52,12 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
super.clearDependencies()
prev = null
}

override protected def getOutputDeterministicLevel = {
if (isOrderSensitive && prev.outputDeterministicLevel == DeterministicLevel.UNORDERED) {
DeterministicLevel.INDETERMINATE
} else {
super.getOutputDeterministicLevel
}
}
}
100 changes: 94 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,9 @@ abstract class RDD[T: ClassTag](

// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
Expand Down Expand Up @@ -806,16 +807,21 @@ abstract class RDD[T: ClassTag](
* serializable and don't require closure cleaning.
*
* @param preservesPartitioning indicates whether the input function preserves the partitioner,
* which should be `false` unless this is a pair RDD and the input function doesn't modify
* the keys.
* which should be `false` unless this is a pair RDD and the input
* function doesn't modify the keys.
* @param isOrderSensitive whether or not the function is order-sensitive. If it's order
* sensitive, it may return totally different result when the input order
* is changed. Mostly stateful functions are order-sensitive.
*/
private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
preservesPartitioning: Boolean = false,
isOrderSensitive: Boolean = false): RDD[U] = withScope {
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
preservesPartitioning)
preservesPartitioning = preservesPartitioning,
isOrderSensitive = isOrderSensitive)
}

/**
Expand Down Expand Up @@ -1634,6 +1640,16 @@ abstract class RDD[T: ClassTag](
}
}

/**
* Return whether this RDD is reliably checkpointed and materialized.
*/
private[rdd] def isReliablyCheckpointed: Boolean = {
checkpointData match {
case Some(reliable: ReliableRDDCheckpointData[_]) if reliable.isCheckpointed => true
case _ => false
}
}

/**
* Gets the name of the directory to which this RDD was checkpointed.
* This is not defined if the RDD is checkpointed locally.
Expand Down Expand Up @@ -1838,6 +1854,63 @@ abstract class RDD[T: ClassTag](
def toJavaRDD() : JavaRDD[T] = {
new JavaRDD(this)(elementClassTag)
}

/**
* Returns the deterministic level of this RDD's output. Please refer to [[DeterministicLevel]]
* for the definition.
*
* By default, an reliably checkpointed RDD, or RDD without parents(root RDD) is DETERMINATE. For
* RDDs with parents, we will generate a deterministic level candidate per parent according to
* the dependency. The deterministic level of the current RDD is the deterministic level
* candidate that is deterministic least. Please override [[getOutputDeterministicLevel]] to
* provide custom logic of calculating output deterministic level.
*/
// TODO: make it public so users can set deterministic level to their custom RDDs.
// TODO: this can be per-partition. e.g. UnionRDD can have different deterministic level for
// different partitions.
private[spark] final lazy val outputDeterministicLevel: DeterministicLevel.Value = {
if (isReliablyCheckpointed) {
DeterministicLevel.DETERMINATE
} else {
getOutputDeterministicLevel
}
}

@DeveloperApi
protected def getOutputDeterministicLevel: DeterministicLevel.Value = {
val deterministicLevelCandidates = dependencies.map {
// The shuffle is not really happening, treat it like narrow dependency and assume the output
// deterministic level of current RDD is same as parent.
case dep: ShuffleDependency[_, _, _] if dep.rdd.partitioner.exists(_ == dep.partitioner) =>
dep.rdd.outputDeterministicLevel

case dep: ShuffleDependency[_, _, _] =>
if (dep.rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE) {
// If map output was indeterminate, shuffle output will be indeterminate as well
DeterministicLevel.INDETERMINATE
} else if (dep.keyOrdering.isDefined && dep.aggregator.isDefined) {
// if aggregator specified (and so unique keys) and key ordering specified - then
// consistent ordering.
DeterministicLevel.DETERMINATE
} else {
// In Spark, the reducer fetches multiple remote shuffle blocks at the same time, and
// the arrival order of these shuffle blocks are totally random. Even if the parent map
// RDD is DETERMINATE, the reduce RDD is always UNORDERED.
DeterministicLevel.UNORDERED
}

// For narrow dependency, assume the output deterministic level of current RDD is same as
// parent.
case dep => dep.rdd.outputDeterministicLevel
}

if (deterministicLevelCandidates.isEmpty) {
// By default we assume the root RDD is determinate.
DeterministicLevel.DETERMINATE
} else {
deterministicLevelCandidates.maxBy(_.id)
}
}
}


Expand Down Expand Up @@ -1891,3 +1964,18 @@ object RDD {
new DoubleRDDFunctions(rdd.map(x => num.toDouble(x)))
}
}

/**
* The deterministic level of RDD's output (i.e. what `RDD#compute` returns). This explains how
* the output will diff when Spark reruns the tasks for the RDD. There are 3 deterministic levels:
* 1. DETERMINATE: The RDD output is always the same data set in the same order after a rerun.
* 2. UNORDERED: The RDD output is always the same data set but the order can be different
* after a rerun.
* 3. INDETERMINATE. The RDD output can be different after a rerun.
*
* Note that, the output of an RDD usually relies on the parent RDDs. When the parent RDD's output
* is INDETERMINATE, it's very likely the RDD's output is also INDETERMINATE.
*/
private[spark] object DeterministicLevel extends Enumeration {
val DETERMINATE, UNORDERED, INDETERMINATE = Value
}
110 changes: 72 additions & 38 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.{RDD, RDDCheckpointData}
import org.apache.spark.rdd.{DeterministicLevel, RDD, RDDCheckpointData}
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
Expand Down Expand Up @@ -328,25 +328,14 @@ class DAGScheduler(
val numTasks = rdd.partitions.length
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
val stage = new ShuffleMapStage(
id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)

stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)

if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
// A previously run stage generated partitions for this shuffle, so for each output
// that's still available, copy information about that output location to the new stage
// (so we don't unnecessarily re-compute that data).
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
(0 until locs.length).foreach { i =>
if (locs(i) ne null) {
// locs(i) will be null if missing
stage.addOutputLoc(i, locs(i))
}
}
} else {
if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
Expand Down Expand Up @@ -1240,7 +1229,8 @@ class DAGScheduler(
// The epoch of the task is acceptable (i.e., the task was launched after the most
// recent failure we're aware of for the executor), so mark the task's output as
// available.
shuffleStage.addOutputLoc(smt.partitionId, status)
mapOutputTracker.registerMapOutput(
shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
// Remove the task's partition from pending partitions. This may have already been
// done above, but will not have been done yet in cases where the task attempt was
// from an earlier attempt of the stage (i.e., not the attempt that's currently
Expand All @@ -1257,16 +1247,14 @@ class DAGScheduler(
logInfo("waiting: " + waitingStages)
logInfo("failed: " + failedStages)

// We supply true to increment the epoch number here in case this is a
// recomputation of the map outputs. In that case, some nodes may have cached
// locations with holes (from when we detected the error) and will need the
// epoch incremented to refetch them.
// TODO: Only increment the epoch number if this is not the first time
// we registered these map outputs.
mapOutputTracker.registerMapOutputs(
shuffleStage.shuffleDep.shuffleId,
shuffleStage.outputLocInMapOutputTrackerFormat(),
changeEpoch = true)
// This call to increment the epoch may not be strictly necessary, but it is retained
// for now in order to minimize the changes in behavior from an earlier version of the
// code. This existing behavior of always incrementing the epoch following any
// successful shuffle map stage completion may have benefits by causing unneeded
// cached map outputs to be cleaned up earlier on executors. In the future we can
// consider removing this call, but this will require some extra investigation.
// See https://github.com/apache/spark/pull/17955/files#r117385673 for more details.
mapOutputTracker.incrementEpoch()

clearCacheLocs()

Expand Down Expand Up @@ -1344,6 +1332,63 @@ class DAGScheduler(
failedStages += failedStage
failedStages += mapStage
if (noResubmitEnqueued) {
// If the map stage is INDETERMINATE, which means the map tasks may return
// different result when re-try, we need to re-try all the tasks of the failed
// stage and its succeeding stages, because the input data will be changed after the
// map tasks are re-tried.
// Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is
// guaranteed to be determinate, so the input data of the reducers will not change
// even if the map tasks are re-tried.
if (mapStage.rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE) {
// It's a little tricky to find all the succeeding stages of `failedStage`, because
// each stage only know its parents not children. Here we traverse the stages from
// the leaf nodes (the result stages of active jobs), and rollback all the stages
// in the stage chains that connect to the `failedStage`. To speed up the stage
// traversing, we collect the stages to rollback first. If a stage needs to
// rollback, all its succeeding stages need to rollback to.
val stagesToRollback = scala.collection.mutable.HashSet(failedStage)

def collectStagesToRollback(stageChain: List[Stage]): Unit = {
if (stagesToRollback.contains(stageChain.head)) {
stageChain.drop(1).foreach(s => stagesToRollback += s)
} else {
stageChain.head.parents.foreach { s =>
collectStagesToRollback(s :: stageChain)
}
}
}

def generateErrorMessage(stage: Stage): String = {
"A shuffle map stage with indeterminate output was failed and retried. " +
s"However, Spark cannot rollback the $stage to re-process the input data, " +
"and has to fail this job. Please eliminate the indeterminacy by " +
"checkpointing the RDD before repartition and try again."
}

activeJobs.foreach(job => collectStagesToRollback(job.finalStage :: Nil))

stagesToRollback.foreach {
case mapStage: ShuffleMapStage =>
val numMissingPartitions = mapStage.findMissingPartitions().length
if (numMissingPartitions < mapStage.numTasks) {
// TODO: support to rollback shuffle files.
// Currently the shuffle writing is "first write wins", so we can't re-run a
// shuffle map stage and overwrite existing shuffle files. We have to finish
// SPARK-8029 first.
abortStage(mapStage, generateErrorMessage(mapStage), None)
}

case resultStage: ResultStage if resultStage.activeJob.isDefined =>
val numMissingPartitions = resultStage.findMissingPartitions().length
if (numMissingPartitions < resultStage.numTasks) {
// TODO: support to rollback result tasks.
abortStage(resultStage, generateErrorMessage(resultStage), None)
}

case _ =>
}
}

// We expect one executor failure to trigger many FetchFailures in rapid succession,
// but all of those task failures can typically be handled by a single resubmission of
// the failed stage. We avoid flooding the scheduler's event queue with resubmit
Expand All @@ -1367,7 +1412,6 @@ class DAGScheduler(
}
// Mark the map whose fetch failed as broken in the map stage
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
}

Expand Down Expand Up @@ -1416,17 +1460,7 @@ class DAGScheduler(

if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleIdToMapStage) {
stage.removeOutputsOnExecutor(execId)
mapOutputTracker.registerMapOutputs(
shuffleId,
stage.outputLocInMapOutputTrackerFormat(),
changeEpoch = true)
}
if (shuffleIdToMapStage.isEmpty) {
mapOutputTracker.incrementEpoch()
}
mapOutputTracker.removeOutputsOnExecutor(execId)
clearCacheLocs()
}
} else {
Expand Down
Loading