Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ private[spark] class TaskSetManager(

/** Check whether a task is currently running an attempt on a given host */
private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = {
taskAttempts(taskIndex).exists(_.host == host)
taskAttempts(taskIndex).exists { info => info.running && info.host == host }
}

private def isTaskBlacklistedOnExecOrNode(index: Int, execId: String, host: String): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,59 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
}

test("speculative task should not run on a given host where another attempt " +
"is already running on") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(
sc, ("execA", "host1"), ("execB", "host2"))
val taskSet = FakeTask.createTaskSet(1,
Seq(TaskLocation("host1", "execA"), TaskLocation("host2", "execB")))
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)

// let task0.0 run on host1
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index == 0)
val info1 = manager.taskAttempts(0)(0)
assert(info1.running === true)
assert(info1.host === "host1")

// long time elapse, and task0.0 is still running,
// so we launch a speculative task0.1 on host2
clock.advance(1000)
manager.speculatableTasks += 0
assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL).get.index === 0)
val info2 = manager.taskAttempts(0)(0)
assert(info2.running === true)
assert(info2.host === "host2")
assert(manager.speculatableTasks.size === 0)

// now, task0 has two copies running on host1, host2 separately,
// so we can not launch a speculative task on any hosts.
manager.speculatableTasks += 0
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL) === None)
assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL) === None)
assert(manager.speculatableTasks.size === 1)

// after a long long time, task0.0 failed, and task0.0 can not re-run since
// there's already a running copy.
clock.advance(1000)
info1.finishTime = clock.getTimeMillis()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be better here for you to call manager.handleFailedTask, to more accurately simulate the real behavior, and also makes the purpose of a test a little more clear.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice suggestion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you shouldn't need to set info.finishTime anymore, that should be taken care of by manager.handleFailedTask.

assert(info1.running === false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(!info1.running)


// time goes on, and task0.1 is still running
clock.advance(1000)
// so we try to launch a new speculative task
// we can not run it on host2, because task0.1 is already running on
assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL) === None)
// we successfully launch a speculative task0.2 on host1, since there's
// no more running copy of task0
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0)
val info3 = manager.taskAttempts(0)(0)
assert(info3.running === true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(info3.running)

assert(info3.host === "host1")
assert(manager.speculatableTasks.size === 0)
}

test("node-local tasks should be scheduled right away " +
"when there are only node-local and no-preference tasks") {
sc = new SparkContext("local", "test")
Expand Down