Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,7 @@ class DAGScheduler(
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
s"(attempt ID ${failedStage.latestInfo.attemptId}) running")
} else {
var abortedStage = false
// It is likely that we receive multiple FetchFailed for a single stage (because we have
// multiple tasks running concurrently on different executors). In that case, it is
// possible the fetch failure has already been handled by the scheduler.
Expand All @@ -1256,11 +1257,13 @@ class DAGScheduler(
if (disallowStageRetryForTest) {
abortStage(failedStage, "Fetch failure will not retry stage due to testing config",
None)
abortedStage = true
} else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
abortStage(failedStage, s"$failedStage (${failedStage.name}) " +
s"has failed the maximum allowable number of " +
s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
s"Most recent failure reason: ${failureMessage}", None)
abortedStage = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another abortStage in if (disallowStageRetryForTest) { branch.

} else if (failedStages.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having the abortedStage variable, how about re-writing the "else if" statement to be:

else {
if (failedStages.isEmpty) {
... stuff currently in else-if ...
}
failedStages += failedStage
failedStages += mapStage
}

That eliminates the confusion of multiple abortStage variables, as @zsxwing pointed out, and also makes the relationship between (i) adding the stage to failed stages and (ii) scheduling the Resubmit event more clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it make sense to me, updated

// Don't schedule an event to resubmit failed stages if failed isn't empty, because
// in that case the event will already have been scheduled.
Expand All @@ -1271,8 +1274,10 @@ class DAGScheduler(
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
}
failedStages += failedStage
failedStages += mapStage
if (!abortedStage) {
failedStages += failedStage
failedStages += mapStage
}
// Mark the map whose fetch failed as broken in the map stage
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler

import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean

import scala.annotation.meta.param
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
Expand All @@ -31,6 +32,7 @@ import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.shuffle.FetchFailedException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you group this with the next import (so import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}

import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils}
Expand Down Expand Up @@ -2105,6 +2107,59 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, shuffleDepC))
}

test("After one stage is aborted for too many failed attempts, subsequent stages" +
"still behave correctly on fetch failures") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the JIRA number here? That helps with tracking tests in the future. So something like "[SPARK-17644] After one stage is aborted..."

def fetchFailJob: Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make this a little more descriptive / easy to read, how about calling the helper "runJobWithPersistentFetchFailure" and then add a comment that says "Runs a job that always encounters a fetch failure, so should eventually be aborted."

val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
val shuffleHandle =
rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
rdd1.map {
case (x, _) if (x == 1) =>
throw new FetchFailedException(
BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
case (x, _) => x
}.count()
}

def successJob: Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and for this perhaps call it "runJobWithTemporaryFetchFailure" and then comment saying "Runs a job that encounters a single fetch failure but succeeds on the second attempt"

object FailThisAttempt {
val _fail = new AtomicBoolean(true)
}
val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
val shuffleHandle =
rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
rdd1.map {
case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false) =>
throw new FetchFailedException(
BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
}
}

failAfter(60.seconds) {
val e = intercept[SparkException] {
fetchFailJob
}
assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
}

// The following job that fails due to fetching failure will hang without
// the fix for SPARK-17644
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change to something like "Run a second job that will fail due to a fetch failure. This job will hang without the fix for SPARK-17644."

failAfter(60.seconds) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a shorter timeout would be appropriate here to avoid slow-ness when this fails...maybe 10 seconds? That still seems plenty conservative since the resubmit timeout is 200 millis.

val e = intercept[SparkException] {
fetchFailJob
}
assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
}

failAfter(60.seconds) {
try {
successJob
} catch {
case e: Throwable => fail("this job should success")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this a little more descriptive -- maybe "A job with one fetch failure should eventually succeed"?

}
}
}

/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
Expand Down