diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index c940cb25d478..515237558fd8 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -33,6 +33,9 @@ import org.apache.spark.util.random.SamplingUtils /** * An object that defines how the elements in a key-value pair RDD are partitioned by key. * Maps each key to a partition ID, from 0 to `numPartitions - 1`. + * + * Note that, partitioner must be deterministic, i.e. it must return the same partition id given + * the same partition key. */ abstract class Partitioner extends Serializable { def numPartitions: Int diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala index 904d9c025629..aa61997122cf 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala @@ -32,12 +32,16 @@ import org.apache.spark.{Partition, TaskContext} * doesn't modify the keys. * @param isFromBarrier Indicates whether this RDD is transformed from an RDDBarrier, a stage * containing at least one RDDBarrier shall be turned into a barrier stage. + * @param isOrderSensitive whether or not the function is order-sensitive. If it's order + * sensitive, it may return totally different result when the input order + * is changed. Mostly stateful functions are order-sensitive. */ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false, - isFromBarrier: Boolean = false) + isFromBarrier: Boolean = false, + isOrderSensitive: Boolean = false) extends RDD[U](prev) { override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None @@ -54,4 +58,12 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( @transient protected lazy override val isBarrier_ : Boolean = isFromBarrier || dependencies.exists(_.rdd.isBarrier()) + + override protected def getOutputDeterministicLevel = { + if (isOrderSensitive && prev.outputDeterministicLevel == DeterministicLevel.UNORDERED) { + DeterministicLevel.INDETERMINATE + } else { + super.getOutputDeterministicLevel + } + } } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index ea895bb3412e..61ad6dfdb221 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -462,8 +462,9 @@ abstract class RDD[T: ClassTag]( // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( - new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), - new HashPartitioner(numPartitions)), + new ShuffledRDD[Int, T, T]( + mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true), + new HashPartitioner(numPartitions)), numPartitions, partitionCoalescer).values } else { @@ -807,16 +808,21 @@ abstract class RDD[T: ClassTag]( * serializable and don't require closure cleaning. * * @param preservesPartitioning indicates whether the input function preserves the partitioner, - * which should be `false` unless this is a pair RDD and the input function doesn't modify - * the keys. + * which should be `false` unless this is a pair RDD and the input + * function doesn't modify the keys. + * @param isOrderSensitive whether or not the function is order-sensitive. If it's order + * sensitive, it may return totally different result when the input order + * is changed. Mostly stateful functions are order-sensitive. */ private[spark] def mapPartitionsWithIndexInternal[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], - preservesPartitioning: Boolean = false): RDD[U] = withScope { + preservesPartitioning: Boolean = false, + isOrderSensitive: Boolean = false): RDD[U] = withScope { new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter), - preservesPartitioning) + preservesPartitioning = preservesPartitioning, + isOrderSensitive = isOrderSensitive) } /** @@ -1636,6 +1642,16 @@ abstract class RDD[T: ClassTag]( } } + /** + * Return whether this RDD is reliably checkpointed and materialized. + */ + private[rdd] def isReliablyCheckpointed: Boolean = { + checkpointData match { + case Some(reliable: ReliableRDDCheckpointData[_]) if reliable.isCheckpointed => true + case _ => false + } + } + /** * Gets the name of the directory to which this RDD was checkpointed. * This is not defined if the RDD is checkpointed locally. @@ -1873,6 +1889,63 @@ abstract class RDD[T: ClassTag]( // RDD chain. @transient protected lazy val isBarrier_ : Boolean = dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, _]]).exists(_.rdd.isBarrier()) + + /** + * Returns the deterministic level of this RDD's output. Please refer to [[DeterministicLevel]] + * for the definition. + * + * By default, an reliably checkpointed RDD, or RDD without parents(root RDD) is DETERMINATE. For + * RDDs with parents, we will generate a deterministic level candidate per parent according to + * the dependency. The deterministic level of the current RDD is the deterministic level + * candidate that is deterministic least. Please override [[getOutputDeterministicLevel]] to + * provide custom logic of calculating output deterministic level. + */ + // TODO: make it public so users can set deterministic level to their custom RDDs. + // TODO: this can be per-partition. e.g. UnionRDD can have different deterministic level for + // different partitions. + private[spark] final lazy val outputDeterministicLevel: DeterministicLevel.Value = { + if (isReliablyCheckpointed) { + DeterministicLevel.DETERMINATE + } else { + getOutputDeterministicLevel + } + } + + @DeveloperApi + protected def getOutputDeterministicLevel: DeterministicLevel.Value = { + val deterministicLevelCandidates = dependencies.map { + // The shuffle is not really happening, treat it like narrow dependency and assume the output + // deterministic level of current RDD is same as parent. + case dep: ShuffleDependency[_, _, _] if dep.rdd.partitioner.exists(_ == dep.partitioner) => + dep.rdd.outputDeterministicLevel + + case dep: ShuffleDependency[_, _, _] => + if (dep.rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE) { + // If map output was indeterminate, shuffle output will be indeterminate as well + DeterministicLevel.INDETERMINATE + } else if (dep.keyOrdering.isDefined && dep.aggregator.isDefined) { + // if aggregator specified (and so unique keys) and key ordering specified - then + // consistent ordering. + DeterministicLevel.DETERMINATE + } else { + // In Spark, the reducer fetches multiple remote shuffle blocks at the same time, and + // the arrival order of these shuffle blocks are totally random. Even if the parent map + // RDD is DETERMINATE, the reduce RDD is always UNORDERED. + DeterministicLevel.UNORDERED + } + + // For narrow dependency, assume the output deterministic level of current RDD is same as + // parent. + case dep => dep.rdd.outputDeterministicLevel + } + + if (deterministicLevelCandidates.isEmpty) { + // By default we assume the root RDD is determinate. + DeterministicLevel.DETERMINATE + } else { + deterministicLevelCandidates.maxBy(_.id) + } + } } @@ -1926,3 +1999,18 @@ object RDD { new DoubleRDDFunctions(rdd.map(x => num.toDouble(x))) } } + +/** + * The deterministic level of RDD's output (i.e. what `RDD#compute` returns). This explains how + * the output will diff when Spark reruns the tasks for the RDD. There are 3 deterministic levels: + * 1. DETERMINATE: The RDD output is always the same data set in the same order after a rerun. + * 2. UNORDERED: The RDD output is always the same data set but the order can be different + * after a rerun. + * 3. INDETERMINATE. The RDD output can be different after a rerun. + * + * Note that, the output of an RDD usually relies on the parent RDDs. When the parent RDD's output + * is INDETERMINATE, it's very likely the RDD's output is also INDETERMINATE. + */ +private[spark] object DeterministicLevel extends Enumeration { + val DETERMINATE, UNORDERED, INDETERMINATE = Value +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index fec6558f412d..50c91da8b13d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -40,7 +40,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.network.util.JavaUtils import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} -import org.apache.spark.rdd.{RDD, RDDCheckpointData} +import org.apache.spark.rdd.{DeterministicLevel, RDD, RDDCheckpointData} import org.apache.spark.rpc.RpcTimeout import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat @@ -1487,6 +1487,63 @@ private[spark] class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + // If the map stage is INDETERMINATE, which means the map tasks may return + // different result when re-try, we need to re-try all the tasks of the failed + // stage and its succeeding stages, because the input data will be changed after the + // map tasks are re-tried. + // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is + // guaranteed to be determinate, so the input data of the reducers will not change + // even if the map tasks are re-tried. + if (mapStage.rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE) { + // It's a little tricky to find all the succeeding stages of `failedStage`, because + // each stage only know its parents not children. Here we traverse the stages from + // the leaf nodes (the result stages of active jobs), and rollback all the stages + // in the stage chains that connect to the `failedStage`. To speed up the stage + // traversing, we collect the stages to rollback first. If a stage needs to + // rollback, all its succeeding stages need to rollback to. + val stagesToRollback = scala.collection.mutable.HashSet(failedStage) + + def collectStagesToRollback(stageChain: List[Stage]): Unit = { + if (stagesToRollback.contains(stageChain.head)) { + stageChain.drop(1).foreach(s => stagesToRollback += s) + } else { + stageChain.head.parents.foreach { s => + collectStagesToRollback(s :: stageChain) + } + } + } + + def generateErrorMessage(stage: Stage): String = { + "A shuffle map stage with indeterminate output was failed and retried. " + + s"However, Spark cannot rollback the $stage to re-process the input data, " + + "and has to fail this job. Please eliminate the indeterminacy by " + + "checkpointing the RDD before repartition and try again." + } + + activeJobs.foreach(job => collectStagesToRollback(job.finalStage :: Nil)) + + stagesToRollback.foreach { + case mapStage: ShuffleMapStage => + val numMissingPartitions = mapStage.findMissingPartitions().length + if (numMissingPartitions < mapStage.numTasks) { + // TODO: support to rollback shuffle files. + // Currently the shuffle writing is "first write wins", so we can't re-run a + // shuffle map stage and overwrite existing shuffle files. We have to finish + // SPARK-8029 first. + abortStage(mapStage, generateErrorMessage(mapStage), None) + } + + case resultStage: ResultStage if resultStage.activeJob.isDefined => + val numMissingPartitions = resultStage.findMissingPartitions().length + if (numMissingPartitions < resultStage.numTasks) { + // TODO: support to rollback result tasks. + abortStage(resultStage, generateErrorMessage(resultStage), None) + } + + case _ => + } + } + // We expect one executor failure to trigger many FetchFailures in rapid succession, // but all of those task failures can typically be handled by a single resubmission of // the failed stage. We avoid flooding the scheduler's event queue with resubmit diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index e0202fe703f8..4e87deb136df 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -31,7 +31,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.internal.config -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{DeterministicLevel, RDD} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException} import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} @@ -57,6 +57,20 @@ class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler) } +class MyCheckpointRDD( + sc: SparkContext, + numPartitions: Int, + dependencies: List[Dependency[_]], + locations: Seq[Seq[String]] = Nil, + @(transient @param) tracker: MapOutputTrackerMaster = null, + indeterminate: Boolean = false) + extends MyRDD(sc, numPartitions, dependencies, locations, tracker, indeterminate) { + + // Allow doCheckpoint() on this RDD. + override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = + Iterator.empty +} + /** * An RDD for passing to DAGScheduler. These RDDs will use the dependencies and * preferredLocations (if any) that are passed to them. They are deliberately not executable @@ -71,7 +85,8 @@ class MyRDD( numPartitions: Int, dependencies: List[Dependency[_]], locations: Seq[Seq[String]] = Nil, - @(transient @param) tracker: MapOutputTrackerMaster = null) + @(transient @param) tracker: MapOutputTrackerMaster = null, + indeterminate: Boolean = false) extends RDD[(Int, Int)](sc, dependencies) with Serializable { override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = @@ -81,6 +96,10 @@ class MyRDD( override def index: Int = i }).toArray + override protected def getOutputDeterministicLevel = { + if (indeterminate) DeterministicLevel.INDETERMINATE else super.getOutputDeterministicLevel + } + override def getPreferredLocations(partition: Partition): Seq[String] = { if (locations.isDefinedAt(partition.index)) { locations(partition.index) @@ -2634,6 +2653,152 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(countSubmittedMapStageAttempts() === 2) } + test("SPARK-23207: retry all the succeeding stages when the map stage is indeterminate") { + val shuffleMapRdd1 = new MyRDD(sc, 2, Nil, indeterminate = true) + + val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2)) + val shuffleId1 = shuffleDep1.shuffleId + val shuffleMapRdd2 = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker) + + val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(2)) + val shuffleId2 = shuffleDep2.shuffleId + val finalRdd = new MyRDD(sc, 2, List(shuffleDep2), tracker = mapOutputTracker) + + submit(finalRdd, Array(0, 1)) + + // Finish the first shuffle map stage. + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)))) + assert(mapOutputTracker.findMissingPartitions(shuffleId1) === Some(Seq.empty)) + + // Finish the second shuffle map stage. + complete(taskSets(1), Seq( + (Success, makeMapStatus("hostC", 2)), + (Success, makeMapStatus("hostD", 2)))) + assert(mapOutputTracker.findMissingPartitions(shuffleId2) === Some(Seq.empty)) + + // The first task of the final stage failed with fetch failure + runEvent(makeCompletionEvent( + taskSets(2).tasks(0), + FetchFailed(makeBlockManagerId("hostC"), shuffleId2, 0, 0, "ignored"), + null)) + + val failedStages = scheduler.failedStages.toSeq + assert(failedStages.length == 2) + // Shuffle blocks of "hostC" is lost, so first task of the `shuffleMapRdd2` needs to retry. + assert(failedStages.collect { + case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId2 => stage + }.head.findMissingPartitions() == Seq(0)) + // The result stage is still waiting for its 2 tasks to complete + assert(failedStages.collect { + case stage: ResultStage => stage + }.head.findMissingPartitions() == Seq(0, 1)) + + scheduler.resubmitFailedStages() + + // The first task of the `shuffleMapRdd2` failed with fetch failure + runEvent(makeCompletionEvent( + taskSets(3).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0, 0, "ignored"), + null)) + + // The job should fail because Spark can't rollback the shuffle map stage. + assert(failure != null && failure.getMessage.contains("Spark cannot rollback")) + } + + private def assertResultStageFailToRollback(mapRdd: MyRDD): Unit = { + val shuffleDep = new ShuffleDependency(mapRdd, new HashPartitioner(2)) + val shuffleId = shuffleDep.shuffleId + val finalRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) + + submit(finalRdd, Array(0, 1)) + + completeShuffleMapStageSuccessfully(taskSets.length - 1, 0, numShufflePartitions = 2) + assert(mapOutputTracker.findMissingPartitions(shuffleId) === Some(Seq.empty)) + + // Finish the first task of the result stage + runEvent(makeCompletionEvent( + taskSets.last.tasks(0), Success, 42, + Seq.empty, createFakeTaskInfoWithId(0))) + + // Fail the second task with FetchFailed. + runEvent(makeCompletionEvent( + taskSets.last.tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), + null)) + + // The job should fail because Spark can't rollback the result stage. + assert(failure != null && failure.getMessage.contains("Spark cannot rollback")) + } + + test("SPARK-23207: cannot rollback a result stage") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil, indeterminate = true) + assertResultStageFailToRollback(shuffleMapRdd) + } + + test("SPARK-23207: local checkpoint fail to rollback (checkpointed before)") { + val shuffleMapRdd = new MyCheckpointRDD(sc, 2, Nil, indeterminate = true) + shuffleMapRdd.localCheckpoint() + shuffleMapRdd.doCheckpoint() + assertResultStageFailToRollback(shuffleMapRdd) + } + + test("SPARK-23207: local checkpoint fail to rollback (checkpointing now)") { + val shuffleMapRdd = new MyCheckpointRDD(sc, 2, Nil, indeterminate = true) + shuffleMapRdd.localCheckpoint() + assertResultStageFailToRollback(shuffleMapRdd) + } + + private def assertResultStageNotRollbacked(mapRdd: MyRDD): Unit = { + val shuffleDep = new ShuffleDependency(mapRdd, new HashPartitioner(2)) + val shuffleId = shuffleDep.shuffleId + val finalRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) + + submit(finalRdd, Array(0, 1)) + + completeShuffleMapStageSuccessfully(taskSets.length - 1, 0, numShufflePartitions = 2) + assert(mapOutputTracker.findMissingPartitions(shuffleId) === Some(Seq.empty)) + + // Finish the first task of the result stage + runEvent(makeCompletionEvent( + taskSets.last.tasks(0), Success, 42, + Seq.empty, createFakeTaskInfoWithId(0))) + + // Fail the second task with FetchFailed. + runEvent(makeCompletionEvent( + taskSets.last.tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), + null)) + + assert(failure == null, "job should not fail") + val failedStages = scheduler.failedStages.toSeq + assert(failedStages.length == 2) + // Shuffle blocks of "hostA" is lost, so first task of the `shuffleMapRdd2` needs to retry. + assert(failedStages.collect { + case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId => stage + }.head.findMissingPartitions() == Seq(0)) + // The first task of result stage remains completed. + assert(failedStages.collect { + case stage: ResultStage => stage + }.head.findMissingPartitions() == Seq(1)) + } + + test("SPARK-23207: reliable checkpoint can avoid rollback (checkpointed before)") { + sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath) + val shuffleMapRdd = new MyCheckpointRDD(sc, 2, Nil, indeterminate = true) + shuffleMapRdd.checkpoint() + shuffleMapRdd.doCheckpoint() + assertResultStageNotRollbacked(shuffleMapRdd) + } + + test("SPARK-23207: reliable checkpoint fail to rollback (checkpointing now)") { + sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath) + val shuffleMapRdd = new MyCheckpointRDD(sc, 2, Nil, indeterminate = true) + shuffleMapRdd.checkpoint() + assertResultStageFailToRollback(shuffleMapRdd) + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 50f10c31427d..9576605b1a21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -258,6 +258,9 @@ object ShuffleExchangeExec { case _ => sys.error(s"Exchange not implemented for $newPartitioning") } + val isRoundRobin = newPartitioning.isInstanceOf[RoundRobinPartitioning] && + newPartitioning.numPartitions > 1 + val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = { // [SPARK-23207] Have to make sure the generated RoundRobinPartitioning is deterministic, // otherwise a retry task may output different rows and thus lead to data loss. @@ -267,9 +270,7 @@ object ShuffleExchangeExec { // // Note that we don't perform local sort if the new partitioning has only 1 partition, under // that case all output rows go to the same partition. - val newRdd = if (SQLConf.get.sortBeforeRepartition && - newPartitioning.numPartitions > 1 && - newPartitioning.isInstanceOf[RoundRobinPartitioning]) { + val newRdd = if (isRoundRobin && SQLConf.get.sortBeforeRepartition) { rdd.mapPartitionsInternal { iter => val recordComparatorSupplier = new Supplier[RecordComparator] { override def get: RecordComparator = new RecordBinaryComparator() @@ -305,17 +306,19 @@ object ShuffleExchangeExec { rdd } + // round-robin function is order sensitive if we don't sort the input. + val isOrderSensitive = isRoundRobin && !SQLConf.get.sortBeforeRepartition if (needToCopyObjectsBeforeShuffle(part)) { - newRdd.mapPartitionsInternal { iter => + newRdd.mapPartitionsWithIndexInternal((_, iter) => { val getPartitionKey = getPartitionKeyExtractor() iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) } - } + }, isOrderSensitive = isOrderSensitive) } else { - newRdd.mapPartitionsInternal { iter => + newRdd.mapPartitionsWithIndexInternal((_, iter) => { val getPartitionKey = getPartitionKeyExtractor() val mutablePair = new MutablePair[Int, InternalRow]() iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) } - } + }, isOrderSensitive = isOrderSensitive) } }