diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 43193dc5366a4..13d2d650fcee0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -786,30 +786,13 @@ private[spark] class TaskSchedulerImpl( try { Option(taskIdToTaskSetManager.get(tid)) match { case Some(taskSet) => - if (state == TaskState.LOST) { - // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, - // where each executor corresponds to a single task, so mark the executor as failed. - val execId = taskIdToExecutorId.getOrElse(tid, { - val errorMsg = - "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)" - taskSet.abort(errorMsg) - throw new SparkException( - "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)") - }) - if (executorIdToRunningTaskIds.contains(execId)) { - reason = Some( - ExecutorProcessLost( - s"Task $tid was lost, so marking the executor as lost as well.")) - removeExecutor(execId, reason.get) - failedExecutor = Some(execId) - } - } + assert(state != TaskState.LOST) if (TaskState.isFinished(state)) { cleanupTaskState(tid) taskSet.removeRunningTask(tid) if (state == TaskState.FINISHED) { taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) - } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { + } else if (Set(TaskState.FAILED, TaskState.KILLED).contains(state)) { taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 5cc97410bcce6..9007ae4e0990a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1369,42 +1369,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty) } - test("if a task finishes with TaskState.LOST its executor is marked as dead") { - sc = new SparkContext("local", "TaskSchedulerImplSuite") - val taskScheduler = new TaskSchedulerImpl(sc) - taskScheduler.initialize(new FakeSchedulerBackend) - // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. - new DAGScheduler(sc, taskScheduler) { - override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {} - override def executorAdded(execId: String, host: String): Unit = {} - } - - val e0Offers = IndexedSeq(WorkerOffer("executor0", "host0", 1)) - val attempt1 = FakeTask.createTaskSet(1) - - // submit attempt 1, offer resources, task gets scheduled - taskScheduler.submitTasks(attempt1) - val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten - assert(1 === taskDescriptions.length) - - // Report the task as failed with TaskState.LOST - taskScheduler.statusUpdate( - tid = taskDescriptions.head.taskId, - state = TaskState.LOST, - serializedData = ByteBuffer.allocate(0) - ) - - // Check that state associated with the lost task attempt is cleaned up: - assert(taskScheduler.taskIdToExecutorId.isEmpty) - assert(taskScheduler.taskIdToTaskSetManager.isEmpty) - assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty) - - // Check that the executor has been marked as dead - assert(!taskScheduler.isExecutorAlive("executor0")) - assert(!taskScheduler.hasExecutorsAliveOnHost("host0")) - assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty) - } - test("Locality should be used for bulk offers even with delay scheduling off") { val conf = new SparkConf() .set(config.LOCALITY_WAIT.key, "0")