Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -786,30 +786,13 @@ private[spark] class TaskSchedulerImpl(
try {
Option(taskIdToTaskSetManager.get(tid)) match {
case Some(taskSet) =>
if (state == TaskState.LOST) {
// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
// where each executor corresponds to a single task, so mark the executor as failed.
val execId = taskIdToExecutorId.getOrElse(tid, {
val errorMsg =
"taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"
taskSet.abort(errorMsg)
throw new SparkException(
"taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)")
})
if (executorIdToRunningTaskIds.contains(execId)) {
reason = Some(
ExecutorProcessLost(
s"Task $tid was lost, so marking the executor as lost as well."))
removeExecutor(execId, reason.get)
failedExecutor = Some(execId)
}
}
assert(state != TaskState.LOST)
if (TaskState.isFinished(state)) {
cleanupTaskState(tid)
taskSet.removeRunningTask(tid)
if (state == TaskState.FINISHED) {
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
} else if (Set(TaskState.FAILED, TaskState.KILLED).contains(state)) {
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1369,42 +1369,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty)
}

test("if a task finishes with TaskState.LOST its executor is marked as dead") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {}
override def executorAdded(execId: String, host: String): Unit = {}
}

val e0Offers = IndexedSeq(WorkerOffer("executor0", "host0", 1))
val attempt1 = FakeTask.createTaskSet(1)

// submit attempt 1, offer resources, task gets scheduled
taskScheduler.submitTasks(attempt1)
val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten
assert(1 === taskDescriptions.length)

// Report the task as failed with TaskState.LOST
taskScheduler.statusUpdate(
tid = taskDescriptions.head.taskId,
state = TaskState.LOST,
serializedData = ByteBuffer.allocate(0)
)

// Check that state associated with the lost task attempt is cleaned up:
assert(taskScheduler.taskIdToExecutorId.isEmpty)
assert(taskScheduler.taskIdToTaskSetManager.isEmpty)
assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty)

// Check that the executor has been marked as dead
assert(!taskScheduler.isExecutorAlive("executor0"))
assert(!taskScheduler.hasExecutorsAliveOnHost("host0"))
assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty)
}

test("Locality should be used for bulk offers even with delay scheduling off") {
val conf = new SparkConf()
.set(config.LOCALITY_WAIT.key, "0")
Expand Down