Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,8 @@ private[spark] class TaskSetManager(
// and we are not using an external shuffle server which could serve the shuffle outputs.
// The reason is the next stage wouldn't be able to fetch the data from this dead executor
// so we would need to rerun these tasks on other executors.
if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled) {
if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled
&& !isZombie) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation (add two spaces)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I'm concerned that we might need some of the functionality below even when the TSM is a zombie. While the TSM shouldn't tell the DAGScheduler that the task was resubmitted, I think it does need to notify the DAGScheduler that tasks on the executor are finished (otherwise they'll never be marked as finished in the UI, for example), and I also think it needs to properly update the running copies of the task.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kayousterhout I think, when TSM is a zombie, resubmitted tasks won't be offered or executed, so it's no need to notify the DAGScheduler that tasks are finished.

for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
if (successful(index)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler

import java.util.Random
import java.util.{Properties, Random}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand All @@ -28,6 +28,7 @@ import org.mockito.Mockito.{mock, never, spy, verify, when}
import org.apache.spark._
import org.apache.spark.internal.config
import org.apache.spark.internal.Logging
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{AccumulatorV2, ManualClock}

Expand Down Expand Up @@ -664,6 +665,55 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(thrown2.getMessage().contains("bigger than spark.driver.maxResultSize"))
}

test("taskSetManager should not send Resubmitted tasks after being a zombie") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make the description here "[SPARK-13931] taskSetManager ..." (and then eliminate the comment below)

// Regression test for SPARK-13931
val conf = new SparkConf().set("spark.speculation", "true")
sc = new SparkContext("local", "test", conf)

val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
sched.initialize(new FakeSchedulerBackend() {
override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {}
})

// count for Resubmitted tasks
var resubmittedTasks = 0
val dagScheduler = new FakeDAGScheduler(sc, sched) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than defining your own DAGScheduler, can you use the existing FakeDAGSCheduler, and then use the FakeTaskScheduler to make sure that the task was recorded as ended for the correct reason? (i.e., not for the reason of being resubmitted)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kayousterhout if I use the existing FakeDAGScheduler, I'll remove variable 'resubmittedTasks', then I can't make this test failed before my code modified.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

@GavinGavinNo1 GavinGavinNo1 Feb 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand. I'm so confused about how to construct a failed test case before code modified, if I modify it below.

test("taskSetManager should not send Resubmitted tasks after being a zombie") {
	// Regression test for SPARK-13931
	val conf = new SparkConf().set("spark.speculation", "true")
	sc = new SparkContext("local", "test", conf)

	val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
	sched.initialize(new FakeSchedulerBackend() {
	  override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {}
	})

	val dagScheduler = new FakeDAGScheduler(sc, sched)
	sched.setDAGScheduler(dagScheduler)

	val singleTask = new ShuffleMapTask(0, 0, null, new Partition {
		override def index: Int = 0
	  }, Seq(TaskLocation("host1", "execA")), new Properties, null)
	val taskSet = new TaskSet(Array(singleTask), 0, 0, 0, null)
	val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)

	// Offer host1, which should be accepted as a PROCESS_LOCAL location
	// by the one task in the task set
	val task1 = manager.resourceOffer("execA", "host1", TaskLocality.PROCESS_LOCAL).get

	// Mark the task as available for speculation, and then offer another resource,
	// which should be used to launch a speculative copy of the task.
	manager.speculatableTasks += singleTask.partitionId
	val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get

	assert(manager.runningTasks === 2)
	assert(manager.isZombie === false)

	val directTaskResult = new DirectTaskResult[String](null, Seq()) {
	  override def value(resultSer: SerializerInstance): String = ""
	}
	// Complete one copy of the task, which should result in the task set manager
	// being marked as a zombie, because at least one copy of its only task has completed.
	manager.handleSuccessfulTask(task1.taskId, directTaskResult)
	assert(manager.isZombie === true)
	assert(sched.endedTasks(0) === Success)
	assert(manager.runningTasks === 1)

	manager.executorLost("execB", "host2", new SlaveLost())
	assert(manager.runningTasks === 0)
	assert(sched.endedTasks(0).isInstanceOf[ExecutorLostFailure])
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see I played around with this a bit and the problem is that the TaskSetManager also sends an ExecutorLost task failure for the task that gets resubmitted, so that failure overrides the saved Resubmitted task end reason. It's fine to leave the existing test, but can you just add a comment that says something like "Keep track of the number of tasks that are resubmitted, so that the test can check that no tasks were resubmitted."

override def taskEnded(task: Task[_], reason: TaskEndReason, result: Any,
accumUpdates: Seq[AccumulatorV2[_, _]], taskInfo: TaskInfo): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you fix the indentation here? (the wrapped parameters should be indented 4 extra spaces from "override")

super.taskEnded(task, reason, result, accumUpdates, taskInfo)
reason match {
case Resubmitted => resubmittedTasks += 1
case _ =>
}
}
}
sched.setDAGScheduler(dagScheduler)

val tasks = Array.tabulate[Task[_]](1) { i =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need Array.tabulate here, given that you're only creating one task / task location?

new ShuffleMapTask(i, 0, null, new Partition {
override def index: Int = 0
}, Seq(TaskLocation("host1", "execA")), new Properties, null)
}
val taskSet = new TaskSet(tasks, 0, 0, 0, null)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
manager.speculatableTasks += tasks.head.partitionId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment here about what's going on? I think it would be more clear if you moved this line below task 1. Then before task 1, add a comment saying "Offer host1, which should be accepted as a PROCESS_LOCAL location by the one task in the task set". Then, before this speculatableTasks line, add something like "Mark the task as available for speculation, and then offer another resource, which should be used to launch a speculative copy of the task."

val task1 = manager.resourceOffer("execA", "host1", TaskLocality.PROCESS_LOCAL).get
val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get

assert(manager.runningTasks == 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use triple equals here and below? That way Scala Test will print out the expected and actual values automatically.

assert(manager.isZombie == false)

val directTaskResult = new DirectTaskResult[String](null, Seq()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here, can you add a comment with something like "Complete one copy of the task, which should result in the task set manager being marked as a zombie, because at least one copy of its only task has completed."

override def value(resultSer: SerializerInstance): String = ""
}
manager.handleSuccessfulTask(task1.taskId, directTaskResult)
assert(manager.isZombie == true)
assert(resubmittedTasks == 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you check that manager.runningTasks is 1 here, and 0 below?


manager.executorLost("execB", "host2", new SlaveLost())
assert(resubmittedTasks == 0)
}

test("speculative and noPref task should be scheduled after node-local") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(
Expand Down