Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,9 @@ class DAGScheduler(
}
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
} else if (failedStages.contains(shuffleStage)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we only have a problem with shuffle map task not result task?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also confuse me before, as far as I'm concerned, the result task in such scenario(speculative task fail but original task success) is ok because it has no child stage, we can use the success task's result and markStageAsFinished. But for shuffle map task, it will cause inconformity between mapOutputTracker and stage's pendingPartitions, it must fix.
I'm not sure of ResultTask's behavior, can you give some advice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I may nitpick kere. Can you simulate what happens to result task if FechFaileded comes before task success?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we may mistakenly mark a job as finished?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I may nitpick here.

No, that's necessary, I should have to make sure about this, thanks for your advice! :)

Can you simulate what happens to result task if FechFaileded comes before task success?

Sure, but it maybe hardly to reproduce this in real env, I'll try to fake it on UT first ASAP.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the UT for simulating this scenario happens to result task.

logInfo(s"Ignoring task $smt because of stage $shuffleStage have " +
s"been marked as failed")
} else {
// The epoch of the task is acceptable (i.e., the task was launched after the most
// recent failure we're aware of for the executor), so mark the task's output as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private[spark] class TaskSetManager(
val successful = new Array[Boolean](numTasks)
private val numFailures = new Array[Int](numTasks)

// Set the coresponding index of Boolean var when the task killed by other attempt tasks,
// Set the corresponding index of Boolean var when the task killed by other attempt tasks,
// this happened while we set the `spark.speculation` to true. The task killed by others
// should not resubmit while executor lost.
private val killedByOtherAttempt: Array[Boolean] = new Array[Boolean](numTasks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2399,6 +2399,50 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
}
}

/**
* This tests the case where origin task success after speculative task got FetchFailed
* before.
*/
test("[SPARK-23811] FetchFailed comes before Success of same task will cause child stage" +
" never succeed") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the test name should describe the expected behavior not the wrong one.
SPARK-23811: staged failed by FetchFailed should ignore following successful tasks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll change it.

// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC
val rddA = new MyRDD(sc, 2, Nil)
val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
val shuffleIdA = shuffleDepA.shuffleId

val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))

val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)

submit(rddC, Array(0, 1))

// Complete both tasks in rddA.
assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostB", 2))))

// The first task success
runEvent(makeCompletionEvent(
taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))

// The second task's speculative attempt fails first, but task self still running.
// This may caused by ExecutorLost.
runEvent(makeCompletionEvent(
taskSets(1).tasks(1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm not very familiar with this test suite, how can you tell it's a speculative task?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we only need to mock the speculative task failed event came before success event, makeCompletionEvent with same taskSets's task can achieve such goal. This also use in task events always posted in speculation / when stage is killed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, you can runEvent(SpeculativeTaskSubmitted) first to simulate a speculative task submitted before you runEvent(makeCompletetionEvent()).

FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"),
null))
// Check currently missing partition.
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1)
// The second result task self success soon.
runEvent(makeCompletionEvent(
taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
// Missing partition number should not change, otherwise it will cause child stage
// never succeed.
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1)
}

/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(resubmittedTasks === 0)
}


test("[SPARK-22074] Task killed by other attempt task should not be resubmitted") {
val conf = new SparkConf().set("spark.speculation", "true")
sc = new SparkContext("local", "test", conf)
Expand Down