Skip to content

Conversation

@scwf
Copy link
Contributor

@scwf scwf commented Sep 23, 2016

What changes were proposed in this pull request?

Time Thread 1 , Job1 Thread 2 , Job2
1 abort stage due to FetchFailed
2 failedStages += failedStage
3 task failed due to FetchFailed
4 can not post ResubmitFailedStages because failedStages is not empty

Then job2 of thread2 never resubmit the failed stage and hang.

We should not add the failedStages when abortStage for fetch failure

How was this patch tested?

added unit test

@SparkQA
Copy link

SparkQA commented Sep 23, 2016

Test build #65818 has finished for PR 15213 at commit d02cf93.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2016

Test build #65822 has finished for PR 15213 at commit 7056cd6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2016

Test build #65823 has finished for PR 15213 at commit 1f7bd88.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
val shuffleHandle =
rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
rdd1.map { x =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may write

rdd1.map { 
    case (x, _) =>              
        if (x == 1) {
                throw new FetchFailedException(BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
        }
        x
}.count()

instead. Pattern matching on tuples looks more readable than accessing to _1 attribute

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or even

rdd1.map { 
    case (x, _) if (x == 1)  =>  throw new FetchFailedException(BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")             
    case (x, _) => x
}.count()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks

@JoshRosen
Copy link
Contributor

/cc @kayousterhout @markhamstra for review of scheduler changes.

.fromExecutorService(Executors.newFixedThreadPool(5))
val duration = 60.seconds

val f1 = Future {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just use failAfter(60.seconds) { ... } to get rid of ExecutionContext and Future.

case (x, _) => x
}.count()
} catch {
case e: Throwable =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't catch Throwable and it's better to check the exception like this:

    failAfter(60.seconds) {
      val e = intercept[SparkException] {
        ...
      }
      assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
    }

}
}
ThreadUtils.awaitResult(f1, duration)
val f2 = Future {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment to explain why needs two same jobs here? It took me a while to figure out. E.g.,

The following job that fails due to fetching failure will hang without the fix for SPARK-17644

@SparkQA
Copy link

SparkQA commented Sep 23, 2016

Test build #65829 has finished for PR 15213 at commit d92adfc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

s"has failed the maximum allowable number of " +
s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
s"Most recent failure reason: ${failureMessage}", None)
abortedStage = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another abortStage in if (disallowStageRetryForTest) { branch.

case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
val failedStage = stageIdToStage(task.stageId)
val mapStage = shuffleIdToMapStage(shuffleId)
var abortedStage = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this line just above if (disallowStageRetryForTest) { since it's only used in that scope.

@markhamstra
Copy link
Contributor

This doesn't make sense to me. The DAGSchedulerEventProcessLoop runs on a single thread and processes a single event from its queue at a time.

When the first CompletionEvent is run as a result of a fetch failure, failedStages is added to and a ResubmitFailedStages event is queued. After handleTaskCompletion is done, the next event from the queue will be processed. As events are sequentially dequeued and handled, either the ResubmitFailedStages event will be handled before the CompletionEvent for the second fetch failure, or the CompletionEvent will be handled before the ResubmitFailedStages event. If the ResubmitFailedStages is handled first, then failedStages will be cleared in resubmitFailedStages, and there will be nothing preventing the subsequent CompletionEvent from queueing another ResubmitFailedStages event to handle additional fetch failures. In the alternative that the second CompletionEvent is queued and handled before the ResubmitFailedStages event, then the additional stages are added to the non-empty failedStages, but there is no need to schedule another ResubmitFailedStages event because the one from the first CompletionEvent is still on the queue and the handling of that queued event will also handle the newly added failedStages from the second CompletionEvent. In either ordering, all the failedStages are handled and there is no race condition.

@zsxwing
Copy link
Member

zsxwing commented Sep 23, 2016

@markhamstra I agreed this is not a race condition since there is only one single thread.

This issue is the code doesn't handle the following two corner cases:

  • failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) && failedStages.isEmpty is true
  • disallowStageRetryForTest && failedStages.isEmpty is true

In the above cases, ResubmitFailedStages won't be posted but failedStages will become non empty. Then in the next job, failedStages.isEmpty returns false, and ResubmitFailedStages still won't be posted.

@markhamstra
Copy link
Contributor

Ok, that makes better sense.

The disallowStageRetryForTest case doesn't worry me too much since it is only used in tests. If we can fix this case, great; else if it remains possible to create failing tests that can never happen outside of the tests, then that is not all that important (but should at least be noted in comments in the test suite.)

Yes, not adding to failedStages after going down either of those two paths to abortStage is a correct fix even if the description of the problem wasn't really accurate. I'll take another look over the weekend to see if the logic can be expressed a bit more clearly.

@scwf
Copy link
Contributor Author

scwf commented Sep 23, 2016

Thanks @zsxwing to explain this.
@markhamstra the issue happens in the case of my PR description. It usually depends on muti-thread submitting jobs cases and the order of fetch failure, so i said it is a race condition.

If you think it is confusing, how about change the title to " do not add failedStages when abort stage"?

@markhamstra
Copy link
Contributor

@scwf That description would actually be at least as bad since there are multiple routes to abortStage and this issue of adding to failedStages only applies to these two. I'll take another look soon and see if I can come up with a clean refactoring and a better description for the commit message.

@scwf
Copy link
Contributor Author

scwf commented Sep 23, 2016

Actually the failedStages only added here in spark.

@markhamstra
Copy link
Contributor

Right, but abortStage occurs elsewhere. "When abort stage" seems to imply that this fix is necessary for all usages of abortStage when the actual problem is not in abortStage but rather in improper additions to failedStages. I've got to go now, but I'll come back to this soon(ish).

@scwf
Copy link
Contributor Author

scwf commented Sep 24, 2016

actual problem is not in abortStage but rather in improper additions to failedStages

Correct, i think a more accurate description for this issue is "do not add failedStages when abortStage for fetch failure"
We do not care other abortStage since it not add the failedStages.

@scwf scwf changed the title [SPARK-17644] [CORE] Fix the race condition when DAGScheduler handle the FetchFailed event [SPARK-17644] [CORE] Do not add failedStages when abortStage for fetch failure Sep 24, 2016
@SparkQA
Copy link

SparkQA commented Sep 24, 2016

Test build #65853 has finished for PR 15213 at commit 1127ca1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@markhamstra
Copy link
Contributor

markhamstra commented Sep 26, 2016

The fix is logically correct; however, the prior code is needlessly complex and not as easy to understand as it should be, and the proposed fix doesn't improve on that. I'd like to take the opportunity to make the code easier to understand and maintain. Something like this:

          // It is likely that we receive multiple FetchFailed for a single stage (because we have
          // multiple tasks running concurrently on different executors). In that case, it is
          // possible the fetch failure has already been handled by the scheduler.
          if (runningStages.contains(failedStage)) {
            logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
              s"due to a fetch failure from $mapStage (${mapStage.name})")
            markStageAsFinished(failedStage, Some(failureMessage))
          } else {
            logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " +
              s"longer running")
          }

          val shouldAbortStage =
            failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
            disallowStageRetryForTest

          if (shouldAbortStage) {
            val abortMessage = if (disallowStageRetryForTest) {
              "Fetch failure will not retry stage due to testing config"
            } else {
              s"$failedStage (${failedStage.name}) " +
              s"has failed the maximum allowable number of " +
              s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
              s"Most recent failure reason: $failureMessage"
            }
            abortStage(failedStage, abortMessage, None)
          } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued
            // TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064
            val noResubmitEnqueued = failedStages.isEmpty
            failedStages += failedStage
            failedStages += mapStage
            if (noResubmitEnqueued) {
              // If failedStages was not empty, then a previous FetchFailed already went through
              // this block of code and queued up a ResubmitFailedStages event that has not yet
              // run.  We, therefore, only need to queue up a new ResubmitFailedStages event when
              // failedStages was empty.
              logInfo(
                s"Resubmitting $mapStage (${mapStage.name}) and " +
                s"$failedStage (${failedStage.name}) due to fetch failure"
              )
              messageScheduler.schedule(
                new Runnable {
                  override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
                },
                DAGScheduler.RESUBMIT_TIMEOUT,
                TimeUnit.MILLISECONDS
              )
            }
          }

          // Mark the map whose fetch failed as broken in the map stage

This should be equivalent to what you have, @scwf, with the exception that fetchFailedAttemptIds.add(stageAttemptId) is done even when disallowStageRetryForTest is true -- which seems like a better idea to me.

Also available here:
markhamstra@368f82d

@squito

@scwf
Copy link
Contributor Author

scwf commented Sep 27, 2016

@markhamstra in my fix i just want to make the minor changes for the dagscheduer, and your fix is also ok to me, i can update this according your comment. Thanks:)
/cc @zsxwing may also have comments on this.

@markhamstra
Copy link
Contributor

@scwf I understand that you were trying to make the least invasive fix possible to deal with the problem. That's usually a good thing to do, but even when that kind of fix is getting to the root of the problem it can still result in layers of patches that are hard to make sense of. That's not really the fault of any one patch; rather, the blame lies more with those of us who often didn't produce clear, maintainable code in the first place. When it's possible to see re-organizing principles that will make the code clearer, reduce duplication, make future maintenance less error prone, etc., then it's usually a good idea to do a little larger refactoring instead of just a minimally invasive fix.

I think this is a small example of where that kind of refactoring makes sense, so that's why I made my code suggestion. If you can see ways to make things even clearer, then feel free to suggest them. I'm sure that Kay, Imran and others who also have been trying to make these kinds of clarifying changes in the DAGScheduler will also chime in if they have further suggestions.

@squito
Copy link
Contributor

squito commented Sep 27, 2016

gosh this is a serious bug, can't believe we haven't found it already. Thanks for reporting and working on a fix.

Honestly, I think just getting the fix in is important enough that I'm fine w/ putting in the minimally invasive thing now. Perhaps also makes it easier to backport. Mark & I can keep discussing some cleanup independently. I'd like to see the test improved slightly, but otherwise I think this is fine to merge now.

assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, shuffleDepC))
}

test("The failed stage never resubmitted due to abort stage in another thread") {
Copy link
Contributor

@squito squito Sep 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of changes for this test:

(a) Earlier there was already some discussion around naming and the PR title has been updated, this test should be renamed as well since multiple threads really have nothing to do with it.
(b) I'd prefer if tests are named to indicate the positive behavior that we want to verify . So with the above, I'd suggest a name like "After one stage is aborted for too many failed attempts, subsequent stages still behave correctly on fetch failures"
(c) duplicated code can be cleaned up (at first when I read the code, I was looking for differences between the two calls, so though its only one copy-paste, the intent is a lot clearer if its just once).

(d) I'd think it would be nice to also include a job which succeeds after a fetch failure at the end (3 jobs total). Unfortunately this is a bit of a pain to do in a test right now since you don't have access to stageAttemptId, but you can do it with something like this:

...
rdd1.map {
        case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false) =>
          throw new FetchFailedException(
            BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
...

with helper

object FailThisAttempt extends Logging {
  val _fail = new AtomicBoolean(true)
}

@markhamstra
Copy link
Contributor

Honestly, I think just getting the fix in is important enough that I'm fine w/ putting in the minimally invasive thing now.

That's fine, @squito -- go ahead and merge when you're happy with your requested changes, and then I'll follow up in short order with a separate refactoring PR.

@SparkQA
Copy link

SparkQA commented Sep 28, 2016

Test build #66006 has finished for PR 15213 at commit f91d86f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@kayousterhout kayousterhout left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this and tests look great with @squito's suggestions. I added a few readability suggestions.

Agree with @markhamstra and @squito that in general it's nice to make the DAGScheduler more readable, but in this case we should do a minimal fix so it can be merged quickly and to minimize the risk of putting this in 2.0.1.

s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
s"Most recent failure reason: ${failureMessage}", None)
abortedStage = true
} else if (failedStages.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having the abortedStage variable, how about re-writing the "else if" statement to be:

else {
if (failedStages.isEmpty) {
... stuff currently in else-if ...
}
failedStages += failedStage
failedStages += mapStage
}

That eliminates the confusion of multiple abortStage variables, as @zsxwing pointed out, and also makes the relationship between (i) adding the stage to failed stages and (ii) scheduling the Resubmit event more clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it make sense to me, updated

import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.shuffle.FetchFailedException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you group this with the next import (so import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}

}

test("After one stage is aborted for too many failed attempts, subsequent stages" +
"still behave correctly on fetch failures") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the JIRA number here? That helps with tracking tests in the future. So something like "[SPARK-17644] After one stage is aborted..."


// The following job that fails due to fetching failure will hang without
// the fix for SPARK-17644
failAfter(60.seconds) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a shorter timeout would be appropriate here to avoid slow-ness when this fails...maybe 10 seconds? That still seems plenty conservative since the resubmit timeout is 200 millis.

try {
successJob
} catch {
case e: Throwable => fail("this job should success")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this a little more descriptive -- maybe "A job with one fetch failure should eventually succeed"?

}

// The following job that fails due to fetching failure will hang without
// the fix for SPARK-17644
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change to something like "Run a second job that will fail due to a fetch failure. This job will hang without the fix for SPARK-17644."


test("After one stage is aborted for too many failed attempts, subsequent stages" +
"still behave correctly on fetch failures") {
def fetchFailJob: Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make this a little more descriptive / easy to read, how about calling the helper "runJobWithPersistentFetchFailure" and then add a comment that says "Runs a job that always encounters a fetch failure, so should eventually be aborted."

}.count()
}

def successJob: Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and for this perhaps call it "runJobWithTemporaryFetchFailure" and then comment saying "Runs a job that encounters a single fetch failure but succeeds on the second attempt"

@scwf
Copy link
Contributor Author

scwf commented Sep 28, 2016

@kayousterhout Thanks for your comment, i have updated PR based on all your comment.

@SparkQA
Copy link

SparkQA commented Sep 28, 2016

Test build #66031 has finished for PR 15213 at commit 09077cb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@scwf
Copy link
Contributor Author

scwf commented Sep 28, 2016

retest this please

@SparkQA
Copy link

SparkQA commented Sep 28, 2016

Test build #66036 has finished for PR 15213 at commit 09077cb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Sep 28, 2016

What else is needed to merge this?

@kayousterhout
Copy link
Contributor

kayousterhout commented Sep 28, 2016

LGTM. My understanding of @markhamstra's comment above is that he's ok with this as-is; @squito any last comments here?

Thanks for fixing this (and adding the unit test) @scwf!

@zsxwing
Copy link
Member

zsxwing commented Sep 28, 2016

LGTM. I'm going to merge this in favor of of another RC since the fix here is correct. If there are any further style suggestions, we can do it in a follow up PR.

try {
runJobWithTemporaryFetchFailure
} catch {
case e: Throwable => fail("A job with one fetch failure should eventually succeed")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you don't need to specifically catch an exception and call fail, the test will automatically fail from an unhandled exception

@squito
Copy link
Contributor

squito commented Sep 28, 2016

made a minor comment b/c I saw it, but don't think this should hold up getting it merged. lgtm for merging as-is and following up later.

@asfgit asfgit closed this in 46d1203 Sep 28, 2016
asfgit pushed a commit that referenced this pull request Sep 28, 2016
… failure

| Time        |Thread 1 ,  Job1          | Thread 2 ,  Job2  |
|:-------------:|:-------------:|:-----:|
| 1 | abort stage due to FetchFailed |  |
| 2 | failedStages += failedStage |    |
| 3 |      |  task failed due to  FetchFailed |
| 4 |      |  can not post ResubmitFailedStages because failedStages is not empty |

Then job2 of thread2 never resubmit the failed stage and hang.

We should not add the failedStages when abortStage for fetch failure

added unit test

Author: w00228970 <[email protected]>
Author: wangfei <[email protected]>

Closes #15213 from scwf/dag-resubmit.

(cherry picked from commit 46d1203)
Signed-off-by: Shixiong Zhu <[email protected]>
zzcclp added a commit to zzcclp/spark that referenced this pull request Oct 9, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants