Skip to content

Conversation

@Ngone51
Copy link
Member

@Ngone51 Ngone51 commented May 27, 2020

What changes were proposed in this pull request?

This PR proposes to shift to the new most local locality level if there're any new more local locality levels are added during TaskSetManager.recomputeLocality.

Why are the changes needed?

There's a race condition between resourceOffers and submitTasks. If submitTasks happens before resourceOffers, especially when there are no executors added to TaskSchedulerImpl at all, the TaskSetManager 's myLocalityLevels will only have ANY locality level, see:

private[scheduler] var myLocalityLevels = computeValidLocalityLevels()

And then, resourceOffers is called with new executors added to TaskSchedulerImpl. Then, recomputeLocality will be called because of executorAdded. During recomputeLocality, the TaskSetManager's myLocalityLevels might have PROCESS_LOCAL, NODE_LOCAL, ANY(because at this time we could find alive executors from TaskSchedulerImpl). But the TaskSetManager will stick to the previous locality level, which is ANY, see:

def recomputeLocality(): Unit = {
// A zombie TaskSetManager may reach here while executorLost happens
if (isZombie) return
val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
myLocalityLevels = computeValidLocalityLevels()
localityWaits = myLocalityLevels.map(getLocalityWait)
currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
}

As a result, in the first round of resourceOffers, the new version delay scheduling won't take effect and TaskSetManager can only schedule tasks at ANY level.

Please note that the problem also exists in old version delay scheduling but the impact is minor because we always reset the locality level after successfully launching a task, which is broken in the new version of dealy scheduling.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Updated and added new test.

@Ngone51
Copy link
Member Author

Ngone51 commented May 27, 2020

ping @bmarcott @cloud-fan @tgravescs

also cc @jiangxb1987 @dongjoon-hyun since this's the root cause for the flaky of #28584

if (currentLocalityIndex > previousLocalityIndex) {
// SPARK-31837: there's new higher level locality, so shift to
// the highest locality level in terms of better data locality
currentLocalityIndex = 0
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only concern of this change is, for general cases, whether the task set would take more time on delay scheduling and thus reduce the throughput of the cluster. But considering the improvement that we gain(to increase the throughput of the cluster) from the new version of the delay scheduling, I think it should not be a big problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have concerns with this. This is going to reset it back to highest level all the time and then you will have to work your way back through the levels, waiting at each level. This could really delay things a lot if you are down to the ANY level and you have to wait 3 seconds between each one. The problem you are describing is only on startup when you have no executors, correct? Perhaps we can look at something more specific for that.

localityWaits = myLocalityLevels.map(getLocalityWait)
currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
if (currentLocalityIndex > previousLocalityIndex) {
// SPARK-31837: there's new higher level locality, so shift to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

level locality -> locality level

@tgravescs
Copy link
Contributor

can you describe the problem in more detail. Is it these tasks are coming in with preferred locality but by the time you get an executor it has already waited the timeout for each of process, node, rack, so you get to ANY?

@cloud-fan
Copy link
Contributor

cloud-fan commented May 27, 2020

This reminds me of #27207 (comment)

It's not a big problem, as we only violate delay scheduling in the first resource offer. But still good to fix it.

@Ngone51
Copy link
Member Author

Ngone51 commented May 27, 2020

@tgravescs
The problem is caused by the race condition between resourcesOffer and submitTasks. If submitTasks happens before resourcesOffer, especially when there are no executors added to TaskScheduler at all, the TaskSetManager 's myLocalityLevels will only have ANY locality level, see:

private[scheduler] var myLocalityLevels = computeValidLocalityLevels()

And then, resourcesOffer is called with new executors added to TaskScheduler. Then, recomputeLocality will be called because of executorAdded. During recomputeLocality, the TaskSetManager's myLocalityLevels may have PROCESS_LOCAL, NODE_LOCAL, ANY(because at this time we have living executors). But the TaskSetManager will stick to the previous locality level, which is ANY, see:

def recomputeLocality(): Unit = {
// A zombie TaskSetManager may reach here while executorLost happens
if (isZombie) return
val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
myLocalityLevels = computeValidLocalityLevels()
localityWaits = myLocalityLevels.map(getLocalityWait)
currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
}

As a result, the TaskSetManager can only schedule tasks at ANY level.

(I'll update the PR description with more detail explanation later)

@tgravescs
Copy link
Contributor

Right when you have no tasks yet it gets set to ANY, but this wasn't changed, that was like that before (the changes to locality also) and when an executor was added it never reset it back to the highest, it just called recomputeLocality again which would have left it at ANY as well.
I'm assuming the difference here is that previously it would reset it as soon as any task got assigned at a higher locality. Note that wasn't necessarily all the way back to 0 though.

The issue I think on resetting it on every recomputeLocality is when you have dynamic allocation and you are adding a bunch of executors. You basically revert back to similar to the old behavior where you could really be delaying vs using executors you already have. You are delaying those tasks that could be running more than we should and wasting the executor resources you have.

let me look into this a bit more.

@cloud-fan
Copy link
Contributor

when you have dynamic allocation and you are adding a bunch of executors.

For newly allocated executors, mostly they don't have any local data to trigger the level reset. To me, this is mostly for the first resource offer to a task set manager.

@tgravescs
Copy link
Contributor

that isn't true if you are on something like YARN and HDFS where data can be on the node.

Ideally I think we reset it when executor added if there aren't any or if it all executors were used and we added one, I just need time to look through the code.

@SparkQA
Copy link

SparkQA commented May 27, 2020

Test build #123195 has finished for PR 28656 at commit cdc9706.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 27, 2020

Test build #123193 has finished for PR 28656 at commit c38ace4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So another thought here is to only reset this when the locality levels actually changed. Meaning if we already have the levels of node and any, we don't need to reset it because we already waited the time for node wait. We only need to set it back to index 0 when we were at just ANY and we have now added in Node. So if the new computed levels are more local than the current ones. Even that may be a bit aggressive but I think is better than resetting it any time we are at a less local state.

This gave me a few ideas though and I thought of a few corner cases we should try to handle better. I'll can file a follow on lira for that though.

myLocalityLevels = computeValidLocalityLevels()
localityWaits = myLocalityLevels.map(getLocalityWait)
currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
if (currentLocalityIndex > previousLocalityIndex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should change this to only happen when executorAdded. this is also called on lost and decommission and it doesn't make sense to go to "lower" level. Note we may want to stay away from saying higher in the comment below. The code values, lower is actually more strict - meaning process is lowest value. so perhaps don't say higher or lower but say more local or less local. Perhaps pass in parameter from executorAdded.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should change this to only happen when executorAdded. this is also called on lost and decommission and it doesn't make sense to go to "lower" level.

I think it's impossible to go to "lower" or more local level in case of lost and decommission. Lost and decommission would remove executors, so the locality levels can only be less compared to the previous locality levels. It also means, lost and decommission will not add new more local levels.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so perhaps don't say higher or lower but say more local or less local.

Yea, good point!

@cloud-fan
Copy link
Contributor

@Ngone51
Copy link
Member Author

Ngone51 commented May 28, 2020

@Ngone51
Copy link
Member Author

Ngone51 commented May 28, 2020

So another thought here is to only reset this when the locality levels actually changed.

I think the current condition(if (currentLocalityIndex > previousLocalityIndex)) could satisfy the requirement. If the currentLocalityIndex is changed to be bigger compared to the previousLocalityIndex, it means there're new locality levels added before the current locality level.

@SparkQA
Copy link

SparkQA commented May 28, 2020

Test build #123225 has finished for PR 28656 at commit 312700a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

ah, sorry I missed that, it is covered. This looks good to me then. I can file another jlira to investigate possibly handling better.

localityWaits = myLocalityLevels.map(getLocalityWait)
currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
if (currentLocalityIndex > previousLocalityIndex) {
// SPARK-31837: there's new higher locality level, so shift to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about If the new level is more local, shift to the most local level?

@cloud-fan
Copy link
Contributor

can we also remove the hack mentioned in #27207 (comment) ?

@bmarcott
Copy link
Contributor

bmarcott commented May 28, 2020

@Ngone51
Thanks for looking into this.

Let's make a few corrections to the description:

  • This happened with previous delay scheduling as well (although likely less noticeable as @tgravescs pointed out)
  • It won't "always" use ANY for "all" tasks in the taskset. For example if a single executor is added that is NODE_LOCAL and is used for scheduling, the very first resourceOffer would switch the taskset to NODE_LOCAL.

I do see how this handles the race condition on startup, although a more specific handling of that case may be preferable (@tgravescs also mentioned).
It seems questionable whether executor added should instantly reset the locality levels.

Deferring to @tgravescs and @cloud-fan to make the call here.

@tgravescs
Copy link
Contributor

I filed a followup jira https://issues.apache.org/jira/browse/SPARK-31856?filter=-2 to look at handling the executor added better, this fix works for now.

// level in terms of better data locality. For example, say the previous locality
// levels are [PROCESS, NODE, ANY] and current level is ANY. After recompute, the
// locality levels are [PROCESS, NODE, RACK, ANY]. Then, we'll shift to RACK level.
currentLocalityIndex = getLocalityIndex(myLocalityLevels.diff(previousMyLocalityLevels).head)
Copy link
Member Author

@Ngone51 Ngone51 May 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi all, there's a defect in the previous implement(always reset currentLocalityIndex to 0). Think about such a case, say we have locality levels [PROCESS, NODE, ANY] and current locality level is ANY. After recompute, we might have locality levels [PROCESS, NODE, RACK, ANY]. In this case, I think we'd better shift to RACK level instead of PROCESS level, since the TaskSetManager has been already delayed for a while on known levels(PROCESS, NODE). So with this update, I think it could also ease our concern on the possible perf regression introduced by aggressive locality level resetting. @bmarcott @tgravescs @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this is one of the cases I was referring to. Ideally you would never run into this case because a host is on a rack so you would always have it. Unfortunately Spark defaults the rack to None so you can. I was going to improve upon it in the jira I filed. We can certainly handle some here if you want

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can certainly handle some here if you want

What do you mean by "handle some here"? I read your JIRA and don't find the specific solution that could be added to this PR. Could you please elaborate more?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just mean there are a bunch of corner cases and I don't think resetting this on every executor added is ideal. I did not list out all of them. On Yarn it actually defaults to default-rack rather then None so it actually won't have this issue because every node has a rack. I agree that the code you have here is an improvement to handle the rack case.

@SparkQA
Copy link

SparkQA commented May 29, 2020

Test build #123273 has finished for PR 28656 at commit 8744f1e.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member Author

Ngone51 commented May 29, 2020

retest this please.

@SparkQA
Copy link

SparkQA commented May 29, 2020

Test build #123290 has finished for PR 28656 at commit 8744f1e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented May 31, 2020

Test build #123325 has finished for PR 28656 at commit 8744f1e.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented May 31, 2020

Test build #123346 has finished for PR 28656 at commit 8744f1e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in bc24c99 Jun 1, 2020
@Ngone51
Copy link
Member Author

Ngone51 commented Jun 2, 2020

thanks all!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants