Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Nov 3, 2015

What changes were proposed in this pull request?

Currently we increase the locality level of tasks if the time elapsed since last task launched at current locality level is more than a threshold.

However, it is possible that the time lapsed is not more than the threshold, but we still wait too long since the task set was created. In this case, we should also increase the locality level to allow more tasks can be executed.

spark.locality.wait is based on last launched time. However, as the JIRA claimed, it is possible that we wait too long from task set creation time, not last launch time. In this case, we should also increase locality level.

This patch introduces a new set of locality timers, based on the time since the task set was submitted. Once the time lapsed since the task set was submitted is more than particular timer, the allowed locality level of task will be increased.

How was this patch tested?

TaskSetManagerSuite.

@SparkQA
Copy link

SparkQA commented Nov 3, 2015

Test build #44916 has finished for PR 9433 at commit f4437d1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Nov 16, 2015

ping @rxin

@andrewor14
Copy link
Contributor

@kayousterhout

@kayousterhout
Copy link
Contributor

The reason the current delay scheduling algorithm doesn’t work this way is because it was considering a scenario like: a bunch of jobs are using the same workers, so as a result, job A can only run a small number of tasks at a time. In this scenario, it doesn’t make sense to look at the time since Job A was submitted, because there will be points where a lot of time has elapsed just because the fairness policy dictates that job A can’t run many tasks at once. If job A has the opportunity to run a task on a non-local worker, it probably doesn’t want to “waste” one of the few slots it’s allowed to use at once on the non-local executor, instead preferring to wait spark.locality.wait to see if a local executor becomes available (for more about this, see the delay scheduling paper: http://elmeleegy.com/khaled/papers/delay_scheduling.pdf).

This patch is addressing a different scenario, where the amount of concurrency the job can use is mostly limited by the locality wait, rather than by some fairness policy. I think the ideal way to fix this problem is with the following policy: if the task set is using less than the number of slots it could be using (where “# slots it could be using” is all of the slots in the cluster if the job is running alone, or the job’s fair share, if it’s not) for some period of time, increase the locality level. The current delay scheduling policy used by Spark is essentially implementing a very simplified version of this ideal policy, where the way it determines if the job is using as many slots as it could be is just to see if a task has been launched recently. This patch adds another heuristic to get closer to this ideal policy.

I’m hesitant to merge this patch for a few reasons:
(1) It makes the scheduling policy harder to reason about, so while it will help performance for some people who run into this case and understand how to tune the configuration parameters, it is likely to confuse others (and possibly introduce scheduling bugs as we try to maintain this functionality).
(2) It also doesn’t implement the ideal policy, as described above. (Although implementing the ideal policy would likely be prohibitive, especially because we would need to do it in a backwards-compatible way.)

On the other hand, the use case this is addressing seems likely to be fairly common, since many folks run Spark jobs in a context where only one Spark job is running on a set of workers at a time.

@mateiz what are your thoughts on this?

As an aside, can you clarify the description of this PR to explicitly say that you’re introducing a new set of locality timers, based on the time since the task set was submitted?

Conflicts:
	core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@viirya
Copy link
Member Author

viirya commented Jun 14, 2016

@kayousterhout Sorry for replying late.

For your first concern, that is the job A might not want to waste on non-local executor and is more like to wait spark.locality.wait for local executor. As in dequeueTask we always dequeue pending tasks for the local executor, job A always can run its task on the local executor if possible.

@SparkQA
Copy link

SparkQA commented Jun 14, 2016

Test build #60484 has finished for PR 9433 at commit be6c4c0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jun 14, 2016

@kayousterhout About the issue that this may make the scheduling policy harder to reason about and confuse others. I think we can disable this by default (by using Long.MaxValue as default waiting time). So let the people who know how to tune the configuration parameters to use it.

@viirya
Copy link
Member Author

viirya commented Jun 16, 2016

ping @kayousterhout

@viirya
Copy link
Member Author

viirya commented Jul 4, 2016

@kayousterhout Any ideas on this?

@viirya
Copy link
Member Author

viirya commented Oct 6, 2016

Seems no interest from you for this. Close this now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants