Skip to content

Conversation

@bmarcott
Copy link
Contributor

@bmarcott bmarcott commented Nov 27, 2019

What changes were proposed in this pull request?

Currently the time window that locality wait times are measuring is the time since the last task launched for a TSM. The proposed change is to instead measure the time since this TSM's available slots were fully utilized.

The number of available slots for a TSM is determined by dividing all slots among the TSMs according to the scheduling policy (FIFO vs FAIR).

Why are the changes needed?

  • cluster can become heavily underutilized as described in SPARK-18886

How was this patch tested?

PoolSuite
TaskSchedulerImplSuite
TaskSetManagerSuite

Thoughts on this approach?
@viirya
@squito
@kayousterhout

@dongjoon-hyun dongjoon-hyun changed the title [WIP][SPARK-18886] Only reset scheduling delay timer if allocated slots are fully utilized [WIP][SPARK-18886][CORE] Only reset scheduling delay timer if allocated slots are fully utilized Nov 28, 2019
@cloud-fan
Copy link
Contributor

ok to test

@cloud-fan
Copy link
Contributor

Sufficient discussions are needed for this problem. AFAIK, the issue of delay scheduling is: it has a timer per task set manager, and the timer gets reset as soon as there is one task from this task set manager gets scheduled on a preferred location.

A stage may keep waiting for locality and not leverage available nodes in the cluster, if its task duration is shorter than the locality wait time (3 seconds by default).

A simple solution is: we never reset the timer. When a stage has been waiting long enough for locality, this stage should not wait for locality anymore. However, this may hurt performance if the last task is scheduled to a non-preferred location, and a preferred location becomes available right after this task gets scheduled, and locality can bring 50x speed up.

I don't have a good idea now. cc @JoshRosen @tgravescs @vanzin @jiangxb1987

@SparkQA
Copy link

SparkQA commented Dec 3, 2019

Test build #114755 has finished for PR 26696 at commit 06ca01f.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Dec 3, 2019

retest this please

@viirya
Copy link
Member

viirya commented Dec 3, 2019

AFAIK, the issue of delay scheduling is: it has a timer per task set manager, and the timer gets reset as soon as there is one task from this task set manager gets scheduled on a preferred location.
A stage may keep waiting for locality and not leverage available nodes in the cluster, if its task duration is shorter than the locality wait time (3 seconds by default).

By adjusting the locality wait time, can't users make better trade off between locality and cluster utilization? In this case, users can set a smaller locality wait time.

@cloud-fan
Copy link
Contributor

@viirya The locality wait time is a global config, not per job/stage. Even if it's per job/stage, I'm not sure how to set an optimal value.

@SparkQA
Copy link

SparkQA commented Dec 3, 2019

Test build #114760 has finished for PR 26696 at commit 06ca01f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

Can you please expand on the description? It sounds like you are saying you fill in all available slots no matter what locality and then reset the timer? Which would essentially bypass the locality wait on all tasks in the first "round" of scheduling, which is not what we want.

, if its task duration is shorter than the locality wait time (3 seconds by default).

It doesn't have to be strictly shorter then wait time as long the harmonics of when tasks finish and get started is shorter then that. I've seen that happen before.

A simple solution is: we never reset the timer. When a stage has been waiting long enough for locality, this stage should not wait for locality anymore. However, this may hurt performance if the last task is scheduled to a non-preferred location, and a preferred location becomes available right after this task gets scheduled,

Your always going to have a race condition where if you wanted a bit longer you would have got locality, there is no way around that. I don't think just waiting a flat timeout makes sense. For instance, if I have 10,000 tasks you are saying you only wait 3 seconds (by default) and then all of them can be scheduled. If you only have 10 executors (with 1 core), only 10 can run in parallel that doesn't even try to get locality on the 9,990 other tasks.

I've been thinking about options here and the one that seems the most straight forward to me is making it delay scheduling on a "slot". Here a "slot" is a place you can put a task. For example if I have 1 executor with 10 cores (default 1 cpu per task) then I have 10 slots. When a slot becomes available you set a timer to be whatever the currently locality level is for that slot. The reason to use slots and not entire executors is I could schedule 1 task on an executor with 10 slots and the other 9 would be wasted if you set the timer per executor.
When the timer goes off for that slot, you check to see if anything can be scheduled on it at that locality, if not you set the timer again to fall back to the next locality level (node -> rack).
This way you aren't wasting available resources. If you really want your tasks to wait for locality just set the timeout on those higher.

Another option is per task, but I think that involves more tracking logic and will use more memory.

I think the per slot logic would work fine with the job scheduler scenario with the Fair scheduler as well.

The way spark does it now just seems broken to me, even in the job scheduler case. At some point you just want to run, if you don't again you can simply increase the timeout. I'm going to go re-read the jira's to make sure I'm not missing anything though. Note if someone is really worried about getting rid of existing logic we could keep it and have a config, but personally think we shouldn't.

thoughts?

@cloud-fan
Copy link
Contributor

The per-slot timer sounds promising to me. I'll think more about it in the following days.

@tgravescs
Copy link
Contributor

so I re-read SPARK-18886 and really the solution mentioned by Kay was very similar - by using slots. In there she mentions it might be complicated to actually code up and use with the Fair scheduler, so the actual design/code changes will need to be investigated further to see how much change it actually requires, but that still seems like the best solution to me.

@bmarcott
Copy link
Contributor Author

bmarcott commented Dec 3, 2019

@tgravescs reworded the description for clarity, please lemme know if it still is not
Timers per slot will have issues when a stage finishes while slots are waiting. Then the next stage would ignore locality for those slots. It is more natural to think of the stages/tasks as having timers since they are the ones being starved, not the resources/slots.

The approach in this PR is similar to what Kay described. Only increase locality levels when your slots are not being utilized for some period of time.

test("SPARK-16106 locality levels updated if executor added to existing host") {
val taskScheduler = setupScheduler()

taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note of why this line was necessary.

The current locality level (currentLocalityIndex) of a TSM is defaulted to the current "most local" level when a taskset is submitted. Before this line was added, that level was ANY as the scheduler was not aware of any resources.

Locality levels are recomputed whenever a new executor is detected, but the current locality level remains the same. Previously the test was passing because the first task that began running was NODE_LOCAL, hence the locality level was set to node local, causing the second task not to run.

Since this PR only resets locality level when all slots are utilized, this locality level reset not happening.
This new line makes it such that the starting locality level is not ANY

@tgravescs
Copy link
Contributor

yeah my approach changes it to be slot delay vs task delay, but it also depends on what we define as task delay, this approach is a lot less code change but need to think through the use cases more.

@bmarcott
Copy link
Contributor Author

bmarcott commented Dec 4, 2019

What were the particular critiques of this PR?

I suggest reading, particularly Kay's comments, SPARK-18886 as well as the comments in this old PR.

Below ideas I have seen (and some I tried) have the following issues:

  1. Never reset timer: delay scheduling only works on first wave (as @tgravescs pointed out)
  2. Per slot timer: delay scheduling should apply per task/taskset (as I pointed out above) which might lead you to # 3
  3. Per slot per stage timer: tasks can be starved by being offered many different slots, each starting a new timer). also too much bookkeeping
  4. Per task timer: you still need a way to distinguish between when a task is waiting for a slot to become available vs it has them available but is not utilizing them (which is what this PR does). To do this right seems to be this PR + more timers.

@bmarcott bmarcott changed the title [WIP][SPARK-18886][CORE] Only reset scheduling delay timer if allocated slots are fully utilized [WIP][SPARK-18886][CORE] Make locality wait time be the time since a TSM's available slots were fully utilized Dec 4, 2019
@cloud-fan
Copy link
Contributor

I'm not very familiar with the resource manager part. To confirm: the task scheduler keeps tracking all the available resources (executor id -> # of cores), and this information is provided by the cluster manager. Tash scheduler decides the # of slots for each TaskSetManager, and TaskSetManager uses this info to decide if it has utilized all its slots and reset the timer if necessary.

is my understand corrected?

@tgravescs
Copy link
Contributor

Overall I think this is a good approach as far as doesn't require a lot of changes and from what I've thought about so far gets what we want. I still need to look at the specifics of the fairscheduler more though as well.

So when to reset (or if you reset) is definitely still a question, arguably you should never reset back to a lower level. If the definition is a task delay then it should just be a delay from when it was submitted. In your current implementation when it falls back is unpredictable. If I happen to schedule on a node local and I'm full then fall back, but if I scheduled on node local and then at ANY, then it stays ANY. You could take the other approach and always reset it back when you are fully utilized or set it back to whatever lowest was in that pass, but then you could end up always waiting the 3 seconds for non-node local again, which seems to go against the definition of the task delay. It really feels like we should never reset it.

For a true per task timer the question still comes into play when do you start that timer, some people may say from when stage was submitted, some might say from when the task possibly could have run (which then you have to define that) because is that when there was enough slots, etc.

@tgravescs
Copy link
Contributor

Note on the fairscheduler side - I think you do have the issue Kay brought up in v2 of comment: https://issues.apache.org/jira/browse/SPARK-18886?focusedCommentId=15931009&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15931009

But on one hand I would argue its not really an issue, or perhaps if its truly not being fair then just a bug, but I'm not super familiar with fairscheduler so need to spend some more time on it. I also need to look at your logic for computing if its utilized or not when using fair scheduler.

@tgravescs
Copy link
Contributor

I assume it fixes the performance on #26633? thanks for adding more unit tests, did run any manual tests as well?

Sorry haven't gotten back to this, hopefully later today or tomorrow will look more at fair scheduler

schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
var isFirst = true
for (schedulable <- sortedSchedulableQueue) {
schedulable.updateAvailableSlots(if (isFirst) numSlots else 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I think this is only first because only one rootPool with FIFO, maybe add comment here, I assume not really needed since sortedScheduleQueeu should just have 1 thing, but here just in case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sortedSchedulableQueue can have many tasksets even with FIFO. Its just all active tasks in the order they were submitted -- which may be multiple tasksets from a single job with a branch, or from concurrent jobs.

I do think a comment is necessary here to explain this. If I understood right, the idea here is that all other tasksets follow the old logic on resetting locality level -- for every single task that is scheduled, the locality timer gets reset. But for the first taskset, it only resets when the taskset is using all slots

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito is correct here.
The idea is for FIFO we assign all "available slots" to the first taskset/pool in the queue.
For FAIR we divide based on total slots based on weights, not counting weights with pools with no tasksets, and ensuring we always give at least minShare slot availability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this makes sense. by default we run FIFO and we can have multiple tasksets at the same time with a single Job. so if we have plenty of slots to fit both tasksets in we could end up with same broken behavior we have now in the second taskset. Its perhaps slightly better as when first tasket finishes then it does go more quickly but in the mean time you could have wasted a lot of time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for pointing this out!
Instead of giving all slots to the first schedulable, I will change it so that if a schedulable has n tasks, it will only give n slots, and proceed to the next until no slots remain.
I will try to apply similar logic to the FAIR case by including the num of tasks in calculation and distributing any "unused" slots to the remaining tasks based on weight.
What are your thoughts here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so something like that is definitely better. I think you have to make sure that n tasks is the number of tasks running or pending, because a taskSet could have 10000 tasks and if 9999 of them are finished it really only needs 1 and the rest should go to the other tasksets. I'm not sure that is easily accessible right now from this location.

It kind of feels to me that FIFO should just have some boolean that is passed to the TSM to tell if it there are remaining slots. with FIFO, if I have multiple tasks set, they do go in priority order, but if the higher priority one passes slots up there is nothing keeping the other tasksets from using it. So I don't know why we should make the locality preference skewed. its definitely not any worse then we have now and I'm not sure how you would really get away from this without going to something more like the node delay vs task delay.

The fair scheduler side I need to think about more. I think it has similar issue with the totalSlots vs how many tasks it actually has left to run. For instance you have 2 pools with 1 taskset each, both with equal fair shares, the first one has run enough where it only has 2 tasks left so do you send the rest of the free slots to the second tsm so it can use more of the cluster. I would generally say yes and I think that is the way it acts now without this patch. The shares for the Fair scheduler just appear to sort the tasksets and they are visited in that order but there doesn't appear to be any real hard enforcement of the limits.

val usableWeights = schedulableQueue.asScala
.map(s => if (s.getSortedTaskSetQueue.nonEmpty) (s, s.weight) else (s, 0))
val totalWeights = usableWeights.map(_._2).sum
usableWeights.foreach({case (schedulable, usableWeight) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra () after foreach. You only need the {}, also fix spacing should be like foreach { case () =>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also I don't think this is correct. The numSlots is the number of currently available slots (ignoring used), here you try to compute what each one gets of those slots based on minshare/weight, but you aren't taking into account what each pool is already using. I might have one pool already using all the slots it should be and one pool using none. The one using all of its slots should get 0 available for it. Although that probably isn't quite right either because the weights and minshares just control the order of the tasksets so I think it can actually get more depending on other factors. Like taskset that is highest prioirty hasn't met locality wait, so then goes to the taskset that is using more but has met locality wait or doesn't have. Then it will could grab one which would at least temporarily put it over its fair share.
This is why Kay was saying its much harder here and was talking about using proxy's to help indicate the conditions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is computing how many slots each taskset/pool could ideally use if distributed according to scheduling policy, not how many more slots it can use in the next round of scheduling.

Say there are 10 total slots with two jobs running, each which have the same weight, so they are assigned a potential of 5 slots each. Also assume taskset 1 is using 6 slots and taskset 2 is using 0.
This code will determine that each taskset's availableSlots is 5, and since taskset 1 is using more than that, its timer will reset, whereas taskset 2's will not. This means taskset 2 will increase its locality level if the locality timer has expired, allowing it to utilize its "ideal" 5 slots.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, I misread when executorIdToCores(o.executorId) = o.cores was set, I thought it was set on every iteration and updated with free cores not the total.

@tgravescs
Copy link
Contributor

so looking at the fairscheduler side of this it brings me back to the real question of what is the definition of a task wait. If you look at Kay's example:

This relates to your idea because of the following situation: suppose you have a cluster with 10 machines, the job has locality preferences for 5 of them (with ids 1, 2, 3, 4, 5), and fairness dictates that the job can only use 3 slots at a time (e.g., it's sharing equally with 2 other jobs). Suppose that for a long time, the job has been running tasks on slots 1, 2, and 3 (so local slots). At this point, the times for machines 6, 7, 8, 9, and 10 will have expired, because the job has been running for a while. But if the job is now offered a slot on one of those non-local machines (e.g., 6), the job hasn't been waiting long for non-local resources: until this point, it's been running it's full share of 3 slots at a time, and it's been doing so on machines that satisfy locality preferences. So, we shouldn't accept that slot on machine 6 – we should wait a bit to see if we can get a slot on 1, 2, 3, 4, or 5.

She is essentially saying that I want my task to wait a bit when a slot becomes available and a task could be scheduled on it. This mostly comes into play when you have multiple tasksets of jobs being scheduled, if you only have a single job using fifo with one taskset it really doesn't matter.

If you have the case Kay mentions and the next time you get a slot offered to you is over the delay when the task set fell all the way back to ANY locality, it would immediately schedule on that and it wouldn't delay at all. Now if you strictly go by the definition of how long a task waiting that would be fine, but if its from when it could have been scheduled that is to aggressive.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a partial review, I still don't understand the fairscheduling part, will need to look at that more.

sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
// import org.slf4j.{Logger, LoggerFactory}
import org.apache.log4j.{Logger, Level}
Logger.getLogger("org.apache.spark.scheduler.TaskSetManager").setLevel(Level.DEBUG)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should check it in with logging turned up (if we really want to, we certainly should reset it for all other tests)

if you were doing this just for local testing, I find its easier to just modify core/src/test/resources/log4j.properties locally, its easier to make sure you don't commit changes to that file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this was just for local testing. will remove

schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
var isFirst = true
for (schedulable <- sortedSchedulableQueue) {
schedulable.updateAvailableSlots(if (isFirst) numSlots else 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sortedSchedulableQueue can have many tasksets even with FIFO. Its just all active tasks in the order they were submitted -- which may be multiple tasksets from a single job with a branch, or from concurrent jobs.

I do think a comment is necessary here to explain this. If I understood right, the idea here is that all other tasksets follow the old logic on resetting locality level -- for every single task that is scheduled, the locality timer gets reset. But for the first taskset, it only resets when the taskset is using all slots

}

val availableSlots = executorIdToCores.values.map(c => c / CPUS_PER_TASK).sum
rootPool.updateAvailableSlots(availableSlots)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks a little expensive to be calling in every call to resourceOffer() -- most of the time that is called just one executor at a time, and each call does numExecs worth of work. So when you go through scheduling for all executors, you end up doing O(numExecs^2) work. could these two calls be moved into the branch which sets newExecAvail = true ?

Copy link
Contributor Author

@bmarcott bmarcott Dec 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, good suggestion.
I'll look into optimizing this.
Also need to take into account when an executor is removed.

@bmarcott
Copy link
Contributor Author

bmarcott commented Dec 6, 2019

hi folks, thanks for leaving more comments.
I am currently on vacation until the 15th of Dec, so will likely not be able to respond.
Once I get back I'll address comments.

@bmarcott
Copy link
Contributor Author

bmarcott commented Dec 7, 2019

inline responses for @tgravescs

arguably you should never reset back to a lower level.

Yea I was thinking the same thing, but I decided not to touch that part yet, since I have mostly thought about when to reset.

Note on the fairscheduler side - I think you do have the issue Kay brought up in v2 of comment: https://issues.apache.org/jira/browse/SPARK-18886?focusedCommentId=15931009&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15931009

I do not believe this PR has that issue, as once the non-greedy taskset has waited its locality wait time it will increase locality level since it wasn't able to take advantage of its slots. Her proposed v1 solution had that problem since it only increased locality level if there were slots not being used by any taskset.

I assume it fixes the performance on #26633? thanks for adding more unit tests, did run any manual tests as well?

I haven't looked in depth at their problem, but it does seem it would solve most of the problem.
I have not yet had the chance to run manual tests.

so looking at the fairscheduler side of this it brings me back to the real question of what is the definition of a task wait. If you look at Kay's example:

I did not catch your point from the comment that started with above quote. This PR handles the case she mentioned since locality level would not increase because it is utilizing all its available slots (timers will keep getting reset). That means when the taskset is offered the new non-local resource, it wouldn't immediately take advantage of it

@SparkQA
Copy link

SparkQA commented Dec 18, 2019

Test build #115508 has finished for PR 26696 at commit 168ab30.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val allocatedSlots = Math.max(
totalSlots * schedulable.weight / totalWeights,
schedulable.minShare)
if (numTasksRemaining < allocatedSlots) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For each schedulable, we either allocate enough slots to fully satisfy the remaining tasks, or we don't allocate slots at all. Is it expected in the FAIR pool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this first outer loop, each iteration will find all the schedulables which have leftover slots and redistributed them accordingly, looping until none have leftover slots. At this point no slots can be redistributed.

The loop starting on line 165 below handles the remaining schedulables.

Copy link
Contributor

@cloud-fan cloud-fan Dec 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it exactly the same as how the FAIR pool assign resources?

Copy link
Contributor Author

@bmarcott bmarcott Dec 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You got me thinking more about this by using the word "exactly". It is the combination of TaskSchedulerImpl, Pool (including FAIR/FIFO scheduling algos), and TaskSetManager (including delay scheduling), etc. which determine how resources are assigned.
The goal for this approach is to simulate scheduling without delay scheduling.
This helps determine how much you are underutilizing resources due to delay scheduling.

So far the most recent diff seems to fall short due to at least a couple reasons:

  1. Scheduling is different depending on if TaskSchedulerImpl.resourceOffers is called one by one with single offers vs if it is called with all offers in one batch. Schedulable.getSortedTaskSetQueue is called only once per resourceOffers call, meaning that for a batch call, it only follows the scheduling algorithm for the first task that is scheduled (seems like a bug).
  2. The approach doesn't exactly follow FAIR ordering, such as the minShareRatio and schedulable name based ordering found in FairSchedulingAlgorithm.

I have a rough idea for an alternative implementation which does a more direct simulation, utilizing the SchedulingAlgorithm trait directly. I'll do more thinking in the coming days.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. from above can be solved by utilizing the SchedulingAlgorithm as mentioned, but 1. still remains. A couple more problematic areas:

I'm trying to think of an idea where the TSM will report directly if it rejected due to delay scheduling, but I am having trouble thinking how to utilize that data due to problem 1. in previous comment.

@SparkQA
Copy link

SparkQA commented Dec 19, 2019

Test build #115538 has finished for PR 26696 at commit f9a85f9.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

@bmarcott thanks for the update, I probably won't have time to review until the new year due to holidays

@bmarcott
Copy link
Contributor Author

bmarcott commented Dec 24, 2019

I think I came up with a much better approach here.
It avoids trying to simulate scheduling logic like the previous approach, which had a lot of discrepancies as well as high time complexity.

This change makes the TaskSetManager.resourceOffer return an explicit boolean saying whether it rejected the resource due to delay scheduling or not

An isAllFreeResources boolean parameter was also added to TaskSchedulerImpl.resourceOffers which tells the scheduler the offers represent all free resources as opposed to a single resource.

Then, timers will be reset only if there were no resources rejected due to scheduling delay since the last offer which included all free resources.

example event sequence:
offer 1 resource that was rejected - no timer reset
offer all resources with no rejects - timer is reset
offer 1 resource, no reject - timer is reset
offer 1 resource that was rejected - no timer reset
offer 1 resource, no reject - no timer reset because previous offer was rejected

Here is a breakdown of when resources are offered (not changed):
Single executors are offered when:

  • a task finishes
  • new executor launched

All free resources are offered when:

  • continually every spark.scheduler.revive.interval seconds (default 1 second)
  • on taskset submit
  • when a task fails
  • speculationScheduler on fixed delay revives if there are any speculative tasks
  • executor lost

One remaining case that isn't handled:
Before any "all free resource" offer, all free resources are offered one by one and all not rejected.
This case should reset the timer, but won't with current impl.

Thoughts or know of any other issues with this approach?

@bmarcott
Copy link
Contributor Author

bmarcott commented Jan 4, 2020

happy new year folks 🎉 🎈

@cloud-fan
Copy link
Contributor

offer 1 resource that was rejected - no timer reset
offer all resources with no rejects - timer is reset
offer 1 resource, no reject - timer is reset
offer 1 resource that was rejected - no timer reset
offer 1 resource, no reject - no timer reset because previous offer was rejected

There are 4 "offer 1 resource", typo?

@tgravescs
Copy link
Contributor

tgravescs commented Jan 6, 2020

thanks for the update and proposal. I think it would be useful to add in a high level description of what exactly the approach is trying to do. You have how its trying to do it and an example but I think what its trying to do is as useful.

Please correct me if I'm wrong, but I think essentially at a high level you are proposing to only reset the timer whenever all the currently free resources are scheduled on. So if you start out with 4 executors, you don't reset the timer until all of them have been scheduled. At that point you reset the timer. Which means that when a running task finishes and you go to start a new one or a new executor comes in, it has to wait the timeout.
And for the fair scheduler case, the way you are passing the flags in handles it resetting the timer if it has used its fair share?

Also just to clarifying a few things:

offer 1 resource, no reject - timer is reset

so the assumption here is that we are out of all resources because we either had 1 task finish or we added a new executor and if nothing was rejected we scheduled that fully and we were fully scheduled before this offer? This is why you have the last line : offer 1 resource, no reject - no timer reset because previous offer was rejected -> since previous offer was rejected there are still free resources so we don't reset.

@tgravescs
Copy link
Contributor

One remaining case that isn't handled:
Before any "all free resource" offer, all free resources are offered one by one and all not rejected.
This case should reset the timer, but won't with current impl.

So I assume by this you mean the startup case, but I'm not sure that is true. You get an "all free resource" case when you first submitTasks.
I think there are 2 cases - static allocation and dynamic allocation. Generally with static you will get your executors before you start any application code, so it won't matter if it makes offers before that. With dynamic allocation generally you won't have any executors so this perhaps is the case on submitTasks you offer all but there are no offers because no executors yet. Which case are you referring to?

@bmarcott
Copy link
Contributor Author

bmarcott commented Jan 7, 2020

offer 1 resource that was rejected - no timer reset
offer all resources with no rejects - timer is reset
offer 1 resource, no reject - timer is reset
offer 1 resource that was rejected - no timer reset
offer 1 resource, no reject - no timer reset because previous offer was rejected

There are 4 "offer 1 resource", typo?

no. Did I make a mistake?

@cloud-fan
Copy link
Contributor

ah it's an event sequence. sorry I misread it.

I'd like to see the high-level description as well. What this PR does is pretty clear: if a TSM doesn't fully utilize its slots because of delay scheduling, fall back to the next locality level after "locality wait" time.

It's unclear to me how the new proposal implements it.

@bmarcott
Copy link
Contributor Author

bmarcott commented Jan 7, 2020

I think it would be useful to add in a high level description of what exactly the approach is trying to do

I agree. We should all have a good idea of what the goal is here.
I understand the high level goal to be to only let delay scheduling be delaying the taskset for the period determined by locality wait. Delaying in this case means not scheduling a task, when it would have, if were not for delay scheduling (indicated by the new boolean return by TSM). We know there has been no delay when there have been no rejects since and including an "all resource offer".
I am now refraining from using the term "all resources utilized, " since there are other reasons they may not be fully utilized, such as blacklisting, which would have caused problems with my previous approach.

Please correct me if I'm wrong, but I think essentially at a high level you are proposing to only reset the timer whenever all the currently free resources are scheduled on. So if you start out with 4 executors, you don't reset the timer until all of them have been scheduled. At that point you reset the timer.

It is not strictly required that all 4 executors be utilized. The new approach directly measures effects due to delay scheduling. As long as there are no rejects due to delay scheduling the timer is reset.

Which means that when a running task finishes and you go to start a new one or a new executor comes in, it has to wait the timeout.

The timer is per TSM, not per task, so a single task doesn't necessarily have to wait when another one completes. The same applies to executors.
As long as the TSM is not rejecting resource offers due to delay scheduling, the timer is reset.

And for the fair scheduler case, the way you are passing the flags in handles it resetting the timer if it has used its fair share?

New flags are not passed it. New flags are returned by the TSM saying whether it rejected an offer due to delay scheduling.
Fair or FIFO scheduling are handled the same way in the new approach. the scheduling determines what resources are offered to which TSMs, and the TSMs return whether they were "delayed."

I believe another interesting bug which exists in master code (timer reset on task launched), is that assume because of FAIR scheduling (or any other reason) a TSM cannot launch tasks for the locality wait period. currently that TSM would be penalized and would increase its locality level.
With the new approach I linked: there would be an all resource offer and that TSM would be offered 0 resources, but it also wouldn't reject any resources due to delay scheduling, so the timer would be reset.

Also just to clarifying a few things:

offer 1 resource, no reject - timer is reset

so the assumption here is that we are out of all resources because we either had 1 task finish or we added a new executor and if nothing was rejected we scheduled that fully and we were fully scheduled before this offer? This is why you have the last line : offer 1 resource, no reject - no timer reset because previous offer was rejected -> since previous offer was rejected there are still free resources so we don't reset.
Whatever assumptions or circumstances caused the case, the fact is that the TSM is not being delayed due to delay scheduling in that case, so the timer should be reset.

When an "all resource offer" has no rejects, we know there is no delay, so we can reset the timer.
For single offers, we only know there has been no delay if there hasn't been any delay since the last all resource offer.

So I assume by this you mean the startup case, but I'm not sure that is true. You get an "all free resource" case when you first submitTasks.
I think there are 2 cases - static allocation and dynamic allocation. Generally with static you will get your executors before you start any application code, so it won't matter if it makes offers before that. With dynamic allocation generally you won't have any executors so this perhaps is the case on submitTasks you offer all but there are no offers because no executors yet. Which case are you referring to?

No, I believe the approach doesn't matter whether executors are statically or dynamically allocated.
The case I am referring to is: imagine you have 2 resources and an "all resource offer" is scheduled every second. when TSM1 is submitted, it'll also get an "all resource offer", and assume it rejects both, causing a prexisting TSM2 to utilize them. Assume those 2 tasks finish, and the freed resources are offered one by one to TSM1, which accepts both, all within 1 second (before any "all resource offer"). This should reset the timer, but it won't in the implementation.

@bmarcott
Copy link
Contributor Author

bmarcott commented Jan 7, 2020

ah it's an event sequence. sorry I misread it.
Sorry if it wasn't clear. I relabeled it as an event sequence.

I'd like to see the high-level description as well. What this PR does is pretty clear: if a TSM doesn't fully utilize its slots because of delay scheduling, fall back to the next locality level after "locality wait" time.

It's unclear to me how the new proposal implements it.

Hopefully my description above cleared it up. But lemme know if I am missing something or if it needs more explanation.

@bmarcott
Copy link
Contributor Author

bmarcott commented Jan 7, 2020

@tgravescs @cloud-fan
Thanks for taking a look!

@bmarcott
Copy link
Contributor Author

hope you had a good weekend. any new thoughts?

@cloud-fan
Copy link
Contributor

timers will be reset only if there were no resources rejected due to scheduling delay since the last offer which included all free resources.

Is this the key point? I'm not familiar with the resource offer part. When an offer is not "all resource", we never reset the timer?

@bmarcott
Copy link
Contributor Author

Is this the key point?

yes

I'm not familiar with the resource offer part.

I listed all the different cases in which all resource offers and single resource offers are made in one of my comments above. Hope that helps.

When an offer is not "all resource", we never reset the timer?

You can reset the timer on a single resource offer if there have been no rejects since the last all resource offer.

@cloud-fan
Copy link
Contributor

sounds good, can you open a new PR for your new idea? then we can review and leave comments.

@tgravescs
Copy link
Contributor

sorry my delay on this, I agree with @cloud-fan can you open a new pr with the new method as it seems like a reasonable approach I just have to walk thru all the scenarios again.

@bmarcott
Copy link
Contributor Author

👍

@tgravescs
Copy link
Contributor

@bmarcott please close this

@bmarcott bmarcott closed this Mar 9, 2020
cloud-fan pushed a commit that referenced this pull request Apr 9, 2020
…ilization due to delay scheduling

### What changes were proposed in this pull request?

[Delay scheduling](http://elmeleegy.com/khaled/papers/delay_scheduling.pdf) is an optimization that sacrifices fairness for data locality in order to improve cluster and workload throughput.

One useful definition of "delay" here is how much time has passed since the TaskSet was using its fair share of resources.

However it is impractical to calculate this delay, as it would require running simulations assuming no delay scheduling. Tasks would be run in different orders with different run times.

Currently the heuristic used to estimate this delay is the time since a task was last launched for a TaskSet. The problem is that it essentially does not account for resource utilization, potentially leaving the cluster heavily underutilized.

This PR modifies the heuristic in an attempt to move closer to the useful definition of delay above.
The newly proposed delay is the time since a TasksSet last launched a task **and** did not reject any resources due to delay scheduling when offered its "fair share".

See the last comments of #26696 for more discussion.

### Why are the changes needed?

cluster can become heavily underutilized as described in [SPARK-18886](https://issues.apache.org/jira/browse/SPARK-18886?jql=project%20%3D%20SPARK%20AND%20text%20~%20delay)

### How was this patch tested?

TaskSchedulerImplSuite

cloud-fan
tgravescs
squito

Closes #27207 from bmarcott/nmarcott-fulfill-slots-2.

Authored-by: Nicholas Marcott <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
…ilization due to delay scheduling

### What changes were proposed in this pull request?

[Delay scheduling](http://elmeleegy.com/khaled/papers/delay_scheduling.pdf) is an optimization that sacrifices fairness for data locality in order to improve cluster and workload throughput.

One useful definition of "delay" here is how much time has passed since the TaskSet was using its fair share of resources.

However it is impractical to calculate this delay, as it would require running simulations assuming no delay scheduling. Tasks would be run in different orders with different run times.

Currently the heuristic used to estimate this delay is the time since a task was last launched for a TaskSet. The problem is that it essentially does not account for resource utilization, potentially leaving the cluster heavily underutilized.

This PR modifies the heuristic in an attempt to move closer to the useful definition of delay above.
The newly proposed delay is the time since a TasksSet last launched a task **and** did not reject any resources due to delay scheduling when offered its "fair share".

See the last comments of apache#26696 for more discussion.

### Why are the changes needed?

cluster can become heavily underutilized as described in [SPARK-18886](https://issues.apache.org/jira/browse/SPARK-18886?jql=project%20%3D%20SPARK%20AND%20text%20~%20delay)

### How was this patch tested?

TaskSchedulerImplSuite

cloud-fan
tgravescs
squito

Closes apache#27207 from bmarcott/nmarcott-fulfill-slots-2.

Authored-by: Nicholas Marcott <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
…ilization due to delay scheduling

Ref: LIHADOOP-57393

[Delay scheduling](http://elmeleegy.com/khaled/papers/delay_scheduling.pdf) is an optimization that sacrifices fairness for data locality in order to improve cluster and workload throughput.

One useful definition of "delay" here is how much time has passed since the TaskSet was using its fair share of resources.

However it is impractical to calculate this delay, as it would require running simulations assuming no delay scheduling. Tasks would be run in different orders with different run times.

Currently the heuristic used to estimate this delay is the time since a task was last launched for a TaskSet. The problem is that it essentially does not account for resource utilization, potentially leaving the cluster heavily underutilized.

This PR modifies the heuristic in an attempt to move closer to the useful definition of delay above.
The newly proposed delay is the time since a TasksSet last launched a task **and** did not reject any resources due to delay scheduling when offered its "fair share".

See the last comments of apache#26696 for more discussion.

cluster can become heavily underutilized as described in [SPARK-18886](https://issues.apache.org/jira/browse/SPARK-18886?jql=project%20%3D%20SPARK%20AND%20text%20~%20delay)

TaskSchedulerImplSuite

cloud-fan
tgravescs
squito

Closes apache#27207 from bmarcott/nmarcott-fulfill-slots-2.

Authored-by: Nicholas Marcott <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants