Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1554,6 +1554,11 @@ private[spark] class DAGScheduler(
case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable =>
mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
sms.shuffleDep.newShuffleMergeState()
case rs: ResultStage if stage.isIndeterminate &&
stage.findMissingPartitions().length != rs.partitions.length =>
abortStage(stage, "An indeterminate result stage cannot be reverted", None)
runningStages -= stage
return
case _ =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have two questions:

  1. The window. which I was talking about is this, and I think it is present in the above code.
    a) The first result task failed, due to which the event loop thread submitted a ResubmitFailedStage message .
    b) But before this message was put in the queue, a new nessage of successful result task arrived.
    So now the sequence of messages is : ResultTask ( successful) , followed by the ResubmitStageMessage
    The event loop thread picks up result task successful and now there is a an output available.
    The event loop thread now picks up ResubmitFailedStage task and sees stage.findMissingPartitions().length != rs.partitions.length
    and proceeds to abort stage ( which I believe is that query will be aborted .... right?).

  2. why are you making a new stage Attempt, because there is already a committed result ( isnt it?). And as it cannot be reverted, query needs to abort ?
    ( I may be wrong in my understanding of the code of dagScheduler and stages, so pls bear with me).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@attilapiros : also I am wondering if you had a clean build? Coz I think in my chnages when I did something like this, I saw a valid inDeterminate stage which had some partitions missing , but that code path was not from a failed task..it was a proper path...
so with this change, that path would also encounter abort.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was 1 failure in Streaming and one in SQL but unrelated.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1.) There is no window. b) is tested actually tested by your createDagInterceptorForSpark51272, isn't it?
2.) That was a mistake

}

Expand Down Expand Up @@ -1939,6 +1944,10 @@ private[spark] class DAGScheduler(
case rt: ResultTask[_, _] =>
// Cast to ResultStage here because it's part of the ResultTask
// TODO Refactor this out to a function that accepts a ResultStage
if (failedStages.contains(stage) && stage.isIndeterminate) {
logInfo(log"Ignoring result from ${MDC(RESULT, rt)} because " +
log"this indeterminate stage failed earlier")
} else {
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.activeJob match {
case Some(job) =>
Expand Down Expand Up @@ -1983,6 +1992,7 @@ private[spark] class DAGScheduler(
case None =>
logInfo(log"Ignoring result from ${MDC(RESULT, rt)} because its job has finished")
}
}

case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
Expand Down
251 changes: 241 additions & 10 deletions core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerId, BlockMan
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, Clock, LongAccumulator, SystemClock, ThreadUtils, Utils}
import org.apache.spark.util.ArrayImplicits._

class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
class DAGSchedulerEventProcessLoopTester(
dagScheduler: DAGScheduler,
dagSchedulerInterceptorOpt: Option[DagSchedulerInterceptor] = None)
extends DAGSchedulerEventProcessLoop(dagScheduler) {

dagScheduler.setEventProcessLoop(this)
Expand All @@ -64,12 +66,15 @@ class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
if (isProcessing) {
// `DAGSchedulerEventProcessLoop` is guaranteed to process events sequentially. So we should
// buffer events for sequent processing later instead of processing them recursively.
dagSchedulerInterceptorOpt.foreach(_.beforeAddingDagEventToQueue(event))
eventQueue += event
dagSchedulerInterceptorOpt.foreach(_.afterAddingDagEventToQueue(event))
} else {
try {
isProcessing = true
// Forward event to `onReceive` directly to avoid processing event asynchronously.
onReceive(event)
dagSchedulerInterceptorOpt.foreach(_.afterDirectProcessingOfDagEvent(event))
} catch {
case NonFatal(e) => onError(e)
} finally {
Expand Down Expand Up @@ -175,6 +180,12 @@ class DummyScheduledFuture(

class DAGSchedulerSuiteDummyException extends Exception

trait DagSchedulerInterceptor {
def beforeAddingDagEventToQueue(event: DAGSchedulerEvent): Unit = {}
def afterAddingDagEventToQueue(event: DAGSchedulerEvent): Unit = {}
def afterDirectProcessingOfDagEvent(event: DAGSchedulerEvent): Unit = {}
}

class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with TimeLimits {

import DAGSchedulerSuite._
Expand Down Expand Up @@ -300,6 +311,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
}

var sparkListener: EventInfoRecordingListener = null
var dagSchedulerInterceptor: DagSchedulerInterceptor = null

var blockManagerMaster: BlockManagerMaster = null
var mapOutputTracker: MapOutputTrackerMaster = null
Expand Down Expand Up @@ -444,7 +456,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
blockManagerMaster,
sc.env))

dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler)
dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler,
Option(dagSchedulerInterceptor))
}

override def afterEach(): Unit = {
Expand All @@ -453,6 +466,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
dagEventProcessLoopTester.stop()
mapOutputTracker.stop()
broadcastManager.stop()
this.dagSchedulerInterceptor = null
} finally {
super.afterEach()
}
Expand Down Expand Up @@ -3153,25 +3167,43 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
null))
(shuffleId1, shuffleId2)
}
private def constructTwoIndeterminateStage(): (Int, Int) = constructTwoStages(true, true)

private def constructTwoIndeterminateStage(): (Int, Int) = {
val shuffleMapRdd1 = new MyRDD(sc, 2, Nil, indeterminate = true)
private def constructTwoStages(
stage1InDeterminate: Boolean,
stage2InDeterminate: Boolean,
isDependencyBetweenStagesTransitive: Boolean = true): (Int, Int) = {
val shuffleMapRdd1 = new MyRDD(sc, 2, Nil, indeterminate = stage1InDeterminate)

val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2))
val shuffleId1 = shuffleDep1.shuffleId
val shuffleMapRdd2 = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker,
indeterminate = true)
val shuffleMapRdd2 = if (isDependencyBetweenStagesTransitive) {
new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker,
indeterminate = stage2InDeterminate)
} else {
new MyRDD(sc, 2, Nil, tracker = mapOutputTracker, indeterminate = stage2InDeterminate)
}

val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(2))
val shuffleId2 = shuffleDep2.shuffleId
val finalRdd = new MyRDD(sc, 2, List(shuffleDep2), tracker = mapOutputTracker)

val finalRdd = if (isDependencyBetweenStagesTransitive) {
new MyRDD(sc, 2, List(shuffleDep2), tracker = mapOutputTracker)
} else {
new MyRDD(sc, 2, List(shuffleDep1, shuffleDep2), tracker = mapOutputTracker)
}

submit(finalRdd, Array(0, 1))
val stageId1 = this.scheduler.shuffleIdToMapStage(shuffleId1).id

// Finish the first shuffle map stage.
completeShuffleMapStageSuccessfully(0, 0, 2)
assert(mapOutputTracker.findMissingPartitions(shuffleId1) === Some(Seq.empty))

completeShuffleMapStageSuccessfully(stageId1, 0, 2)
import org.scalatest.concurrent.Eventually._
import org.scalatest.matchers.should.Matchers._
import org.scalatest.time.SpanSugar._
eventually(timeout(1.minutes), interval(500.milliseconds)) {
mapOutputTracker.findMissingPartitions(shuffleId1) should equal(Some(Nil))
}
(shuffleId1, shuffleId2)
}

Expand All @@ -3185,6 +3217,157 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
"Spark can only do this while using the new shuffle block fetching protocol"))
}



test("SPARK-51272: retry all the partitions of result stage, if the first result task" +
" has failed and failing ShuffleMap stage is inDeterminate") {
this.dagSchedulerInterceptor = createDagInterceptorForSpark51272(
() => taskSets.find(_.shuffleId.isEmpty).get.tasks(1), "RELEASE_LATCH")

val numPartitions = 2
// The first shuffle stage is completed by the below function itself which creates two
// stages.
val (shuffleId1, shuffleId2) = constructTwoStages(
stage1InDeterminate = false,
stage2InDeterminate = true,
isDependencyBetweenStagesTransitive = false)
val shuffleStage1 = this.scheduler.shuffleIdToMapStage(shuffleId1)
val shuffleStage2 = this.scheduler.shuffleIdToMapStage(shuffleId2)
completeShuffleMapStageSuccessfully(shuffleStage2.id, 0, numPartitions)
val resultStage = scheduler.stageIdToStage(2).asInstanceOf[ResultStage]
val activeJob = resultStage.activeJob
assert(activeJob.isDefined)
// The result stage is still waiting for its 2 tasks to complete
assert(resultStage.findMissingPartitions() == Seq.tabulate(numPartitions)(i => i))

// The below event is going to initiate the retry of previous indeterminate stages, and also
// the retry of all result tasks. But before the "ResubmitFailedStages" event is added to the
// queue of Scheduler, a successful completion of the result partition task is added to the
// event queue. Due to scenario, the bug surfaces where instead of retry of all partitions
// of result tasks (2 tasks in total), only some (1 task) get retried
runEvent(
makeCompletionEvent(
taskSets.find(_.stageId == resultStage.id).get.tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0L, 0, 0, "ignored"),
null))

import org.scalatest.concurrent.Eventually._
import org.scalatest.matchers.should.Matchers._
import org.scalatest.time.SpanSugar._
eventually(timeout(3.minutes), interval(500.milliseconds)) {
shuffleStage1.latestInfo.attemptNumber() should equal(1)
}
completeShuffleMapStageSuccessfully(shuffleStage1.id, 1, numPartitions)

eventually(timeout(3.minutes), interval(500.milliseconds)) {
shuffleStage2.latestInfo.attemptNumber() should equal(1)
}
completeShuffleMapStageSuccessfully(shuffleStage2.id, 1, numPartitions)
eventually(timeout(3.minutes), interval(500.milliseconds)) {
resultStage.latestInfo.attemptNumber() should equal(1)
}
org.scalatest.Assertions.assert(resultStage.latestInfo.numTasks == numPartitions)
org.scalatest.Assertions.assert(resultStage.findMissingPartitions().size == numPartitions)
}

test("SPARK-51272: retry all the partitions of result stage, if the first result task" +
" has failed with failing ShuffleStage determinate but result stage has another ShuffleStage" +
" which is indeterminate") {
this.dagSchedulerInterceptor = createDagInterceptorForSpark51272(
() => taskSets.find(_.shuffleId.isEmpty).get.tasks(1), "RELEASE_LATCH")

val numPartitions = 2
// The first shuffle stage is completed by the below function itself which creates two
// stages.
val (detShuffleId1, indetShuffleId2) = constructTwoStages(
stage1InDeterminate = false,
stage2InDeterminate = true,
isDependencyBetweenStagesTransitive = false)
val detShuffleStage1 = this.scheduler.shuffleIdToMapStage(detShuffleId1)
val inDetshuffleStage2 = this.scheduler.shuffleIdToMapStage(indetShuffleId2)
completeShuffleMapStageSuccessfully(inDetshuffleStage2.id, 0, numPartitions)
assert(mapOutputTracker.findMissingPartitions(indetShuffleId2) === Some(Seq.empty))
val resultStage = scheduler.stageIdToStage(2).asInstanceOf[ResultStage]
val activeJob = resultStage.activeJob
assert(activeJob.isDefined)
// The result stage is still waiting for its 2 tasks to complete
assert(resultStage.findMissingPartitions() == Seq.tabulate(numPartitions)(i => i))


// The below event will cause the first task of result stage to fail.
// Below scenario should happen if behaving correctly:
// Since the result stage is dependent on two shuffles of which 1 is inDeterminate,
// the retry of the ResultStage should be for both tasks, even if the failed shuffle stage
// is deterministic, as there is no guarantee at this point, if the indeterminate shuffle
// stage 2 has also failed or not. If inDeterminate stage too has hypothetically failed for,
// for first result partition1, but successful for result partition2, then re-execution of
// of shuffle stage 2 ( indeterminate) , will cause wrong results. So to avoid this, once
// an inDeterminate Result Stage is being retried, no successful partitions should be
// accepted having stale attempt
//
runEvent(
makeCompletionEvent(
taskSets.find(_.shuffleId.isEmpty).get.tasks(0),
FetchFailed(makeBlockManagerId("hostA"), detShuffleId1, 0L, 0, 0, "ignored"),
null))

import org.scalatest.concurrent.Eventually._
import org.scalatest.matchers.should.Matchers._
import org.scalatest.time.SpanSugar._
eventually(timeout(3.minutes), interval(500.milliseconds)) {
detShuffleStage1.latestInfo.attemptNumber() should equal(1)
}
completeShuffleMapStageSuccessfully(detShuffleStage1.id, 1, numPartitions)

// Though the inDetShuffleStage2 has not suffered any loss, but source code of DagScheduler
// has code to remove shuffleoutputs based on the lost BlockManager , which in this case will
// result in loss of output of shuffle2 also. It looses one partition and hence will be
// re-attempted..
// But that re-attempt should fetch all partitions!
eventually(timeout(3.minutes), interval(500.milliseconds)) {
inDetshuffleStage2.latestInfo.attemptNumber() should equal(1)
}
org.scalatest.Assertions.assert(inDetshuffleStage2.latestInfo.numTasks == 2)
org.scalatest.Assertions.assert(inDetshuffleStage2.findMissingPartitions().size == 2)
completeShuffleMapStageSuccessfully(inDetshuffleStage2.id, 1, numPartitions)
eventually(timeout(3.minutes), interval(500.milliseconds)) {
resultStage.latestInfo.attemptNumber() should equal(1)
}
org.scalatest.Assertions.assert(resultStage.latestInfo.numTasks == numPartitions)
}

test("SPARK-51272: retry all the partitions of Shuffle stage, if any task of ShuffleStage " +
" has failed and failing ShuffleMap stage is inDeterminate") {
val numPartitions = 2
this.dagSchedulerInterceptor = createDagInterceptorForSpark51272(
() => taskSets.filter(_.shuffleId.isDefined).maxBy(_.shuffleId.get).tasks(1),
makeMapStatus(host = "hostZZZ", reduces = numPartitions))
// The first shuffle stage is completed by the below function itself which creates two
// indeterminate stages.
val (shuffleId1, shuffleId2) = constructTwoStages(
stage1InDeterminate = false,
stage2InDeterminate = true,
isDependencyBetweenStagesTransitive = false
)
// This will trigger the resubmit failed stage and in before adding resubmit message to the
// queue, a successful partition completion event will arrive.
runEvent(
makeCompletionEvent(
taskSets.filter(_.shuffleId.isDefined).maxBy(_.shuffleId.get).tasks(0),
FetchFailed(makeBlockManagerId("hostB"), shuffleId2, 0L, 0, 0, "ignored"),
null))

val shuffleStage2 = scheduler.shuffleIdToMapStage(shuffleId2)
import org.scalatest.concurrent.Eventually._
import org.scalatest.matchers.should.Matchers._
import org.scalatest.time.SpanSugar._

eventually(timeout(30.seconds), interval(500.milliseconds)) {
shuffleStage2.latestInfo.attemptNumber() should equal(1)
}
org.scalatest.Assertions.assert(shuffleStage2.findMissingPartitions().size == numPartitions)
}

test("SPARK-25341: retry all the succeeding stages when the map stage is indeterminate") {
val (shuffleId1, shuffleId2) = constructIndeterminateStageFetchFailed()

Expand Down Expand Up @@ -5135,6 +5318,54 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
}
CompletionEvent(task, reason, result, accumUpdates ++ extraAccumUpdates, metricPeaks, taskInfo)
}

private def createDagInterceptorForSpark51272(latchReleaseTask: () => Task[_], taskResult: Any):
DagSchedulerInterceptor = {
new DagSchedulerInterceptor {
val latch = new CountDownLatch(1)
override def beforeAddingDagEventToQueue(event: DAGSchedulerEvent): Unit = {
event match {
case ResubmitFailedStages =>
// Before the ResubmitFailedStages is added to the queue, add the successful
// partition task completion.
runEvent(makeCompletionEvent(latchReleaseTask(), Success, taskResult))

case _ =>
}
}

override def afterAddingDagEventToQueue(event: DAGSchedulerEvent): Unit = {
event match {
case CompletionEvent(_, reason, result, _, _, _) =>
reason match {
case Success if result == taskResult => latch.countDown()

case _ =>
}

case _ =>
}
}

override def afterDirectProcessingOfDagEvent(event: DAGSchedulerEvent): Unit = {
event match {
case CompletionEvent(_, reason, _, _, _, _) =>
reason match {
case FetchFailed(_, _, _, _, _, _) =>
// Do not allow this thread to exit, till spurious sucessfull task
// ( latchRelease task gets in the queue). This would ensure that
// ResubmitFailedStages task will always be processed after the spurious task
// is processed.
latch.await(50, TimeUnit.SECONDS)

case _ =>
}

case _ =>
}
}
}
}
}

class DAGSchedulerAbortStageOffSuite extends DAGSchedulerSuite {
Expand Down
Loading