Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,13 @@ private[spark] class TaskSetManager(
val successful = new Array[Boolean](numTasks)
private val numFailures = new Array[Int](numTasks)

// Set the coresponding index of Boolean var when the task killed by other attempt tasks,
// Set the corresponding index of Boolean var when the task killed by other attempt tasks,
// this happened while we set the `spark.speculation` to true. The task killed by others
// should not resubmit while executor lost.
private val killedByOtherAttempt: Array[Boolean] = new Array[Boolean](numTasks)

private val fetchFailedTaskIndexSet = new HashSet[Int]

val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
private[scheduler] var tasksSuccessful = 0

Expand Down Expand Up @@ -750,6 +752,10 @@ private[spark] class TaskSetManager(
if (tasksSuccessful == numTasks) {
isZombie = true
}
} else if (fetchFailedTaskIndexSet.contains(index)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you making this change? I don't quite get it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change of fetchFailedTaskIndexSet is to ignore the task success event if the stage is marked as failed, as Wenchen's suggestion in before comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should handle this case in DAGScheduler, then we can look up the stage by task id, and see if the stage is failed or not. Then we don't need fetchFailedTaskIndexSet.

Copy link
Member Author

@xuanyuanking xuanyuanking Apr 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great thanks for you two's guidance, that's more clear and the UT added for reproducing this problem can also used for checking it!

logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
" because task " + index + " has already failed by FetchFailed")
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can not simply return here. And we should always send a task CompletionEvent to DAG, in case of there's any listeners are waiting for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, we can mark task asFAILED with UnknownReason here. And then, DAG will treat this task as no-op, and registerMapOutput will not be triggered. Though, it is not a elegant way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, as @cloud-fan 's suggestion, handle this in DAGScheduler is a better choice.

} else {
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
" because task " + index + " has already completed successfully")
Expand Down Expand Up @@ -793,6 +799,7 @@ private[spark] class TaskSetManager(
blacklistTracker.foreach(_.updateBlacklistForFetchFailure(
fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId))
}
fetchFailedTaskIndexSet.add(index)

None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(resubmittedTasks === 0)
}


test("[SPARK-22074] Task killed by other attempt task should not be resubmitted") {
val conf = new SparkConf().set("spark.speculation", "true")
sc = new SparkContext("local", "test", conf)
Expand Down