Skip to content

Conversation

@bmarcott
Copy link
Contributor

@bmarcott bmarcott commented Apr 11, 2020

What changes were proposed in this pull request?

Remove the requirement to launch a task in order to reset locality wait timer.

Why are the changes needed?

Recently #27207 was merged, but contained a bug which leads to undesirable behavior.

The crux of the issue is that single resource offers couldn't reset the timer, if there had been a previous reject followed by an allResourceOffer with no available resources.
This lead to a problem where once locality level reached ANY, single resource offers are all accepted, leading allResourceOffers to be left with no resources to utilize (hence no task being launched on an all resource offer -> no timer reset). The task manager would be stuck in ANY locality level.

Noting down here the downsides of using below reset conditions, in case we want to follow up.
As this is quite complex, I could easily be missing something, so please comment/respond if you have more bad behavior scenarios or find something wrong here:
The format is:

Reset condition

  • the unwanted side effect
    • the cause/use case

Below references to locality increase/decrease mean:

PROCESS_LOCAL, NODE_LOCAL ... .. ANY
    ------ locality decrease --->
   <----- locality increase -----

Task launch:

  • locality decrease:
    • Blacklisting, FAIR/FIFO scheduling, or task resource requirements can minimize tasks launched
  • locality increase:
    • single task launch decreases locality despite many tasks remaining

No delay schedule reject since last allFreeResource offer

  • locality decrease:
    • locality wait less than allFreeResource offer frequency, which occurs at least 1 per second
  • locality increase:
    • single resource (or none) not rejected despite many tasks remaining (other lower priority tasks utilizing resources)

Current impl - No delay schedule reject since last (allFreeResource offer + task launch)

  • locality decrease:
    • all from above
  • locality increase:
    • single resource accepted and task launched despite many tasks remaining

The current impl is an improvement on the legacy (task launch) in that unintended locality decrease case is similar and the unintended locality increase case only occurs when the cluster is fully utilized.

For the locality increase cases, perhaps a config which specifies a certain % of tasks in a taskset to finish before resetting locality levels would be helpful.

If that was considered a good approach then perhaps removing the task launch as a requirement would eliminate most of downsides listed above.
Lemme know if you have more ideas for eliminating locality increase downside of No delay schedule reject since last allFreeResource offer

Does this PR introduce any user-facing change?

No

How was this patch tested?

TaskSchedulerImplSuite

Also manually tested similar to how I tested in #27207 using this simple app.

With the new changes, given locality wait of 10s the behavior is generally:
10 seconds of locality being respected, followed by a single full utilization of resources using ANY locality level, followed by 10 seconds of locality being respected, and so on

If the legacy flag is enabled (spark.locality.wait.legacyResetOnTaskLaunch=true), the behavior is only scheduling PROCESS_LOCAL tasks (only utilizing a single executor)

@cloud-fan
@tgravescs

@HyukjinKwon
Copy link
Member

also cc @Ngone51 and @jiangxb1987

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-18886][CORE] remove requirement to launch a task to reset locality wait timer [SPARK-18886][CORE][FOLLOWUP] remove requirement to launch a task to reset locality wait timer Apr 12, 2020
@dongjoon-hyun
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Apr 13, 2020

Test build #121153 has finished for PR 28188 at commit 2f810ac.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

cloud-fan commented Apr 13, 2020

LGTM, cc @tgravescs

@bmarcott
Copy link
Contributor Author

in order to merge this, do I need to push again for the Linter (R) to run, or is that covered by the other test build?

@tgravescs
Copy link
Contributor

I'll take a look today

@tgravescs
Copy link
Contributor

sorry didn't get to this today, will look in the morning. I thought we added this for specific condition so want to think about it a bit.

@bmarcott
Copy link
Contributor Author

I believe this is the reason we added it. This is your comment from other PR:
@tgravescs

One thing I don't think I like is that if you are fully scheduled, we keep trying to schedule "all resources" but if there are no resources, then we continue to reset the timer. This means that it takes a long time to fall back in case where you may have multiple tasksets and the first task set rejects it and the second one takes it and the tasks are finishing such that you get an all resources offer in between the task finishes. In this scenario the first taskset can get starved. We would need to perhaps track this separately.

I will also spend some more time thinking on this.

@cloud-fan
Do we need to revert the other commit and later land this all as one commit?

@bmarcott bmarcott force-pushed the nmarcott-locality-fix branch from 2f810ac to e22caed Compare April 14, 2020 04:39
@bmarcott
Copy link
Contributor Author

Meanwhile..
I added a test case which matches that scenario you described in the other PR.

I changed the code such that launching a task is still required to reset the timer, but not launching a task does not prevent you from resetting on a follow up offer.
This passes the new test and previous ones.

It seems better to remove the requirement to launch a task, but I'm ok with this since it isn't worse than legacy locality wait.

@bmarcott bmarcott changed the title [SPARK-18886][CORE][FOLLOWUP] remove requirement to launch a task to reset locality wait timer [SPARK-18886][CORE][FOLLOWUP] allow follow up locality resets even if no task was launched Apr 14, 2020
@SparkQA
Copy link

SparkQA commented Apr 14, 2020

Test build #121251 has finished for PR 28188 at commit e22caed.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Apr 14, 2020

Test build #121263 has finished for PR 28188 at commit e22caed.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@tgravescs
Copy link
Contributor

I don't think we need to revert the original, this is 3.1 which won't go out for a while. I'd rather have us error towards allowing scheduling more then reset it to aggressively.

One thing I don't think I like is that if you are fully scheduled, we keep trying to schedule "all resources" but if there are no resources, then we continue to reset the timer. This means that it takes a long time to fall back in case where you may have multiple tasksets and the first task set rejects it and the second one takes it and the tasks are finishing such that you get an all resources offer in between the task finishes. In this scenario the first taskset can get starved. We would need to perhaps track this separately.

Ah right, I think originally I was wondering if we could track this separately. Somehow have a kind of starvation flag. I'll think about this some more as well.

@SparkQA
Copy link

SparkQA commented Apr 14, 2020

Test build #121273 has finished for PR 28188 at commit e22caed.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@bmarcott
Copy link
Contributor Author

I also updated the PR description with the new approach/fix as well as the scenarios that have bad behavior.

@bmarcott
Copy link
Contributor Author

@tgravescs
In my manual, test locality wait is essentially ignored, so I feel we should get this in sooner rather than later (or revert previous commit).

@tgravescs
Copy link
Contributor

I'm fine with committed this as long as you file a followup jira to investigate a better approach for my concern with multiple task sets

@bmarcott
Copy link
Contributor Author

bmarcott commented Apr 15, 2020

Maybe I'm not fully understanding the new JIRA in relation to this PR.
This PR keeps the launch task condition to reset timer, which is what handles the multiple task set scenario (and causes the new test to pass).

@tgravescs
Copy link
Contributor

sorry I missed your update, I need to review, but I don't have time today. There is no reason to revert, 3.1 is not shipping, I don't want to hurry something in

@tgravescs
Copy link
Contributor

sorry for my delay here. I think the terminology you use of locality increase and decrease is a bit confusing to other people. There is locality wait time and then there are the different levels of locality ranging from process to any. Is a decrease going from any down to rack... or is it a decrease in the wait time. So can you please clarify that in the description.

Reviewing in more detail now.

@tgravescs
Copy link
Contributor

test this please

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes look good for this scenario described. Thanks. If you can update the description that would be great. I am rerunning tests just to be sure.

@SparkQA
Copy link

SparkQA commented Apr 21, 2020

Test build #121587 has finished for PR 28188 at commit e22caed.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@bmarcott
Copy link
Contributor Author

thanks. I agree my terminology was confusing. I reversed the terms increase/decrease and provided a graph showing what I am referring to.

@tgravescs
Copy link
Contributor

thanks @bmarcott merged to master.

@asfgit asfgit closed this in 8b77b31 Apr 22, 2020
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
… no task was launched

Ref: LIHADOOP-57393

Remove the requirement to launch a task in order to reset locality wait timer.

Recently apache#27207 was merged, but contained a bug which leads to undesirable behavior.

The crux of the issue is that single resource offers couldn't reset the timer, if there had been a previous reject followed by an allResourceOffer with no available resources.
This lead to a problem where once locality level reached ANY, single resource offers are all accepted, leading allResourceOffers to be left with no resources to utilize (hence no task being launched on an all resource offer -> no timer reset). The task manager would be stuck in ANY locality level.

Noting down here the downsides of using below reset conditions, in case we want to follow up.
As this is quite complex, I could easily be missing something, so please comment/respond if you have more bad behavior scenarios or find something wrong here:
The format is:

> **Reset condition**
>  - the unwanted side effect
>      - the cause/use case

Below references to locality increase/decrease mean:
```
PROCESS_LOCAL, NODE_LOCAL ... .. ANY
    ------ locality decrease --->
   <----- locality increase -----
```

**Task launch:**
- locality decrease:
   - Blacklisting, FAIR/FIFO scheduling, or task resource requirements can minimize tasks launched
 - locality increase:
   - single task launch decreases locality despite many tasks remaining

**No delay schedule reject since last allFreeResource offer**
- locality decrease:
   - locality wait less than allFreeResource offer frequency, which occurs at least 1 per second
- locality increase:
   - single resource (or none) not rejected despite many tasks remaining (other lower priority tasks utilizing resources)

**Current impl - No delay schedule reject since last (allFreeResource offer + task launch)**
- locality decrease:
  - all from above
- locality increase:
   - single resource accepted and task launched despite many tasks remaining

The current impl is an improvement on the legacy (task launch) in that unintended locality decrease case is similar and the unintended locality increase case only occurs when the cluster is fully utilized.

For the locality increase cases, perhaps a config which specifies a certain % of tasks in a taskset to finish before resetting locality levels would be helpful.

**If** that was considered a good approach then perhaps removing the task launch as a requirement would eliminate most of downsides listed above.
Lemme know if you have more ideas for eliminating locality increase downside of **No delay schedule reject since last allFreeResource offer**

No

TaskSchedulerImplSuite

Also manually tested similar to how I tested in apache#27207 using [this simple app](https://github.com/bmarcott/spark-test-apps/blob/master/src/main/scala/TestLocalityWait.scala).

With the new changes, given locality wait of 10s the behavior is generally:
10 seconds of locality being respected, followed by a single full utilization of resources using ANY locality level, followed by 10 seconds of locality being respected, and so on

If the legacy flag is enabled (spark.locality.wait.legacyResetOnTaskLaunch=true), the behavior is only scheduling PROCESS_LOCAL tasks (only utilizing a single executor)

cloud-fan
tgravescs

Closes apache#28188 from bmarcott/nmarcott-locality-fix.

Authored-by: Nicholas Marcott <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>

RB=2466127
BUG=LIHADOOP-57393
G=spark-reviewers
R=mmuralid,minyang,mshen,chsingh
A=mmuralid,mshen
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants