Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1107,10 +1107,19 @@ private[spark] class TaskSetManager(
def recomputeLocality(): Unit = {
// A zombie TaskSetManager may reach here while executorLost happens
if (isZombie) return
val previousLocalityIndex = currentLocalityIndex
val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
val previousMyLocalityLevels = myLocalityLevels
myLocalityLevels = computeValidLocalityLevels()
localityWaits = myLocalityLevels.map(getLocalityWait)
currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
if (currentLocalityIndex > previousLocalityIndex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should change this to only happen when executorAdded. this is also called on lost and decommission and it doesn't make sense to go to "lower" level. Note we may want to stay away from saying higher in the comment below. The code values, lower is actually more strict - meaning process is lowest value. so perhaps don't say higher or lower but say more local or less local. Perhaps pass in parameter from executorAdded.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should change this to only happen when executorAdded. this is also called on lost and decommission and it doesn't make sense to go to "lower" level.

I think it's impossible to go to "lower" or more local level in case of lost and decommission. Lost and decommission would remove executors, so the locality levels can only be less compared to the previous locality levels. It also means, lost and decommission will not add new more local levels.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so perhaps don't say higher or lower but say more local or less local.

Yea, good point!

// SPARK-31837: If the new level is more local, shift to the new most local locality
// level in terms of better data locality. For example, say the previous locality
// levels are [PROCESS, NODE, ANY] and current level is ANY. After recompute, the
// locality levels are [PROCESS, NODE, RACK, ANY]. Then, we'll shift to RACK level.
currentLocalityIndex = getLocalityIndex(myLocalityLevels.diff(previousMyLocalityLevels).head)
Copy link
Member Author

@Ngone51 Ngone51 May 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi all, there's a defect in the previous implement(always reset currentLocalityIndex to 0). Think about such a case, say we have locality levels [PROCESS, NODE, ANY] and current locality level is ANY. After recompute, we might have locality levels [PROCESS, NODE, RACK, ANY]. In this case, I think we'd better shift to RACK level instead of PROCESS level, since the TaskSetManager has been already delayed for a while on known levels(PROCESS, NODE). So with this update, I think it could also ease our concern on the possible perf regression introduced by aggressive locality level resetting. @bmarcott @tgravescs @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this is one of the cases I was referring to. Ideally you would never run into this case because a host is on a rack so you would always have it. Unfortunately Spark defaults the rack to None so you can. I was going to improve upon it in the jira I filed. We can certainly handle some here if you want

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can certainly handle some here if you want

What do you mean by "handle some here"? I read your JIRA and don't find the specific solution that could be added to this PR. Could you please elaborate more?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just mean there are a bunch of corner cases and I don't think resetting this on every executor added is ideal. I did not list out all of them. On Yarn it actually defaults to default-rack rather then None so it actually won't have this issue because every node has a rack. I agree that the code you have here is an improvement to handle the rack case.

}
}

def executorAdded(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,6 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with

test("SPARK-31485: barrier stage should fail if only partial tasks are launched") {
initLocalClusterSparkContext(2)
// It's required to reset the delay timer when a task is scheduled, otherwise all the tasks
// could get scheduled at ANY level.
sc.conf.set(config.LEGACY_LOCALITY_WAIT_RESET, true)
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
val dep = new OneToOneDependency[Int](rdd0)
// set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
test("SPARK-16106 locality levels updated if executor added to existing host") {
val taskScheduler = setupScheduler()

taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1)))
taskScheduler.submitTasks(FakeTask.createTaskSet(2, stageId = 0, stageAttemptId = 0,
(0 until 2).map { _ => Seq(TaskLocation("host0", "executor2")) }: _*
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ class TaskSetManagerSuite
manager.executorAdded()
sched.addExecutor("execC", "host2")
manager.executorAdded()
assert(manager.resourceOffer("exec1", "host1", ANY)._1.isDefined)
assert(manager.resourceOffer("execB", "host1", ANY)._1.isDefined)
sched.removeExecutor("execA")
manager.executorLost(
"execA",
Expand All @@ -634,6 +634,25 @@ class TaskSetManagerSuite
assert(sched.taskSetsFailed.contains(taskSet.id))
}

test("SPARK-31837: Shift to the new highest locality level if there is when recomputeLocality") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc)
val taskSet = FakeTask.createTaskSet(2,
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host1", "execA")))
val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, 1, clock = clock)
// before any executors are added to TaskScheduler, the manager's
// locality level only has ANY, so tasks can be scheduled anyway.
assert(manager.resourceOffer("execB", "host2", ANY)._1.isDefined)
sched.addExecutor("execA", "host1")
manager.executorAdded()
// after adding a new executor, the manager locality has PROCESS_LOCAL, NODE_LOCAL, ANY.
// And we'll shift to the new highest locality level, which is PROCESS_LOCAL in this case.
assert(manager.resourceOffer("execC", "host3", ANY)._1.isEmpty)
assert(manager.resourceOffer("execA", "host1", ANY)._1.isDefined)
}

test("test RACK_LOCAL tasks") {
// Assign host1 to rack1
FakeRackUtil.assignHostToRack("host1", "rack1")
Expand Down