From e15b2abedb6fcaf6bac8775f15bdd246fa22902e Mon Sep 17 00:00:00 2001 From: GavinGavinNo1 Date: Wed, 8 Feb 2017 22:51:59 +0800 Subject: [PATCH 1/4] Resolve stage hanging up problem in a particular case --- .../spark/scheduler/TaskSetManager.scala | 3 +- .../spark/scheduler/TaskSetManagerSuite.scala | 52 ++++++++++++++++++- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 3b25513bea057..081901f48438e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -874,7 +874,8 @@ private[spark] class TaskSetManager( // and we are not using an external shuffle server which could serve the shuffle outputs. // The reason is the next stage wouldn't be able to fetch the data from this dead executor // so we would need to rerun these tasks on other executors. - if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled) { + if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled + && !isZombie) { for ((tid, info) <- taskInfos if info.executorId == execId) { val index = taskInfos(tid).index if (successful(index)) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index ffb9fe461a486..0c48ec033f1f8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.util.Random +import java.util.{Properties, Random} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -28,6 +28,7 @@ import org.mockito.Mockito.{mock, never, spy, verify, when} import org.apache.spark._ import org.apache.spark.internal.config import org.apache.spark.internal.Logging +import org.apache.spark.serializer.SerializerInstance import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.{AccumulatorV2, ManualClock} @@ -664,6 +665,55 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(thrown2.getMessage().contains("bigger than spark.driver.maxResultSize")) } + test("taskSetManager should not send Resubmitted tasks after being a zombie") { + // Regression test for SPARK-13931 + val conf = new SparkConf().set("spark.speculation", "true") + sc = new SparkContext("local", "test", conf) + + val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2")) + sched.initialize(new FakeSchedulerBackend() { + override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {} + }) + + // count for Resubmitted tasks + var resubmittedTasks = 0 + val dagScheduler = new FakeDAGScheduler(sc, sched) { + override def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, + accumUpdates: Seq[AccumulatorV2[_, _]], taskInfo: TaskInfo): Unit = { + super.taskEnded(task, reason, result, accumUpdates, taskInfo) + reason match { + case Resubmitted => resubmittedTasks += 1 + case _ => + } + } + } + sched.setDAGScheduler(dagScheduler) + + val tasks = Array.tabulate[Task[_]](1) { i => + new ShuffleMapTask(i, 0, null, new Partition { + override def index: Int = 0 + }, Seq(TaskLocation("host1", "execA")), new Properties, null) + } + val taskSet = new TaskSet(tasks, 0, 0, 0, null) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) + manager.speculatableTasks += tasks.head.partitionId + val task1 = manager.resourceOffer("execA", "host1", TaskLocality.PROCESS_LOCAL).get + val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get + + assert(manager.runningTasks == 2) + assert(manager.isZombie == false) + + val directTaskResult = new DirectTaskResult[String](null, Seq()) { + override def value(resultSer: SerializerInstance): String = "" + } + manager.handleSuccessfulTask(task1.taskId, directTaskResult) + assert(manager.isZombie == true) + assert(resubmittedTasks == 0) + + manager.executorLost("execB", "host2", new SlaveLost()) + assert(resubmittedTasks == 0) + } + test("speculative and noPref task should be scheduled after node-local") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler( From 24d8d795d26a5b1477cac01f2748c25fb9b74dc5 Mon Sep 17 00:00:00 2001 From: GavinGavinNo1 Date: Fri, 10 Feb 2017 01:02:13 +0800 Subject: [PATCH 2/4] optimization --- .../spark/scheduler/TaskSetManager.scala | 2 +- .../spark/scheduler/TaskSetManagerSuite.scala | 28 ++++++++++++------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 081901f48438e..f0e0d1cc3f126 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -875,7 +875,7 @@ private[spark] class TaskSetManager( // The reason is the next stage wouldn't be able to fetch the data from this dead executor // so we would need to rerun these tasks on other executors. if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled - && !isZombie) { + && !isZombie) { for ((tid, info) <- taskInfos if info.executorId == execId) { val index = taskInfos(tid).index if (successful(index)) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 0c48ec033f1f8..46b3ce87e7e11 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -689,29 +689,37 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } sched.setDAGScheduler(dagScheduler) - val tasks = Array.tabulate[Task[_]](1) { i => - new ShuffleMapTask(i, 0, null, new Partition { + val singleTask = new ShuffleMapTask(0, 0, null, new Partition { override def index: Int = 0 }, Seq(TaskLocation("host1", "execA")), new Properties, null) - } - val taskSet = new TaskSet(tasks, 0, 0, 0, null) + val taskSet = new TaskSet(Array(singleTask), 0, 0, 0, null) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) - manager.speculatableTasks += tasks.head.partitionId + + // Offer host1, which should be accepted as a PROCESS_LOCAL location + // by the one task in the task set val task1 = manager.resourceOffer("execA", "host1", TaskLocality.PROCESS_LOCAL).get + + // Mark the task as available for speculation, and then offer another resource, + // which should be used to launch a speculative copy of the task. + manager.speculatableTasks += singleTask.partitionId val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get - assert(manager.runningTasks == 2) - assert(manager.isZombie == false) + assert(manager.runningTasks === 2) + assert(manager.isZombie === false) val directTaskResult = new DirectTaskResult[String](null, Seq()) { override def value(resultSer: SerializerInstance): String = "" } + // Complete one copy of the task, which should result in the task set manager + // being marked as a zombie, because at least one copy of its only task has completed. manager.handleSuccessfulTask(task1.taskId, directTaskResult) - assert(manager.isZombie == true) - assert(resubmittedTasks == 0) + assert(manager.isZombie === true) + assert(resubmittedTasks === 0) + assert(manager.runningTasks === 1) manager.executorLost("execB", "host2", new SlaveLost()) - assert(resubmittedTasks == 0) + assert(manager.runningTasks === 0) + assert(resubmittedTasks === 0) } test("speculative and noPref task should be scheduled after node-local") { From 3b73476e2489bf9cfac7de18d135ffe7f5021fe5 Mon Sep 17 00:00:00 2001 From: 16092929 <16092929@cnsuning.com> Date: Tue, 28 Feb 2017 16:30:29 +0800 Subject: [PATCH 3/4] add comment for defining a DAGScheduler --- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 46b3ce87e7e11..fa11d8a07126b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -665,8 +665,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(thrown2.getMessage().contains("bigger than spark.driver.maxResultSize")) } - test("taskSetManager should not send Resubmitted tasks after being a zombie") { - // Regression test for SPARK-13931 + test("[SPARK-13931] taskSetManager should not send Resubmitted tasks after being a zombie") { val conf = new SparkConf().set("spark.speculation", "true") sc = new SparkContext("local", "test", conf) @@ -675,7 +674,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {} }) - // count for Resubmitted tasks + // Keep track of the number of tasks that are resubmitted, + // so that the test can check that no tasks were resubmitted. var resubmittedTasks = 0 val dagScheduler = new FakeDAGScheduler(sc, sched) { override def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, From 1c3a6ff261a0a4e2b82f0fd7f985e7e14b93f37d Mon Sep 17 00:00:00 2001 From: 16092929 <16092929@cnsuning.com> Date: Wed, 1 Mar 2017 10:57:14 +0800 Subject: [PATCH 4/4] format code --- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index fa11d8a07126b..0883338d0aa0c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -678,8 +678,12 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // so that the test can check that no tasks were resubmitted. var resubmittedTasks = 0 val dagScheduler = new FakeDAGScheduler(sc, sched) { - override def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, - accumUpdates: Seq[AccumulatorV2[_, _]], taskInfo: TaskInfo): Unit = { + override def taskEnded( + task: Task[_], + reason: TaskEndReason, + result: Any, + accumUpdates: Seq[AccumulatorV2[_, _]], + taskInfo: TaskInfo): Unit = { super.taskEnded(task, reason, result, accumUpdates, taskInfo) reason match { case Resubmitted => resubmittedTasks += 1