-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24819][CORE] Fail fast when no enough slots to launch the barrier stage on job submitted #22001
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this breaks barrier execution on mesos completely? (since available slot is 0 it will just fail)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but finegrained is being deprecated...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only MesosFineGrainedSchedulerBackend shall break, we still support MesosCoarseGrainedSchedulerBackend
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jiangxb1987 Could you create a JIRA and link here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be saved instead of re-compute on each stage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in the method description of SchedulerBackend.getNumSlots():
* Note that please don't cache the value returned by this method, because the number can change
* due to add/remove executors.
It shall be fine to cache that within different stages of a job, but it requires a few more changes that will make the current PR more complicated.
|
Test build #94245 has finished for PR 22001 at commit
|
|
ok to test |
|
retest this please |
|
Test build #94278 has finished for PR 22001 at commit
|
|
retest this please |
|
Test build #94283 has finished for PR 22001 at commit
|
|
Test build #94307 has finished for PR 22001 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about maxConcurrentTasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executorDataMap.values.map { executor =>
executor.totalCores / scheduler.CPUS_PER_TASK
}.sumThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a test verifies if total slots are good but some are running other jobs, we shouldn't fail the barrier job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this wait code to barrier suite, because it is only required there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a unit test for getNumSlots.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should tolerate temporarily unavailability here by adding a wait or retry logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jiangxb1987 Could you create a JIRA and link here?
0df8f74 to
bf0eccc
Compare
|
Test build #94490 has finished for PR 22001 at commit
|
|
Test build #94491 has finished for PR 22001 at commit
|
825d2d9 to
eb689ac
Compare
| } | ||
|
|
||
| /** | ||
| * Get the max number of tasks that can be concurrent launched currently. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about like this?
* Get the max number of tasks that can be concurrently launched when the method is called.
* Note that please don't cache the value returned by this method, because the number can be
* changed by adding/removing executors.
|
|
||
| private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL = | ||
| ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval") | ||
| .doc("Time in seconds to wait between a max concurrent tasks check failure and the next " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a max -> max?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"a ... failure"
| private[scheduler] val jobIdToNumTasksCheckFailures = new ConcurrentHashMap[Int, Int] | ||
|
|
||
| /** | ||
| * Time in seconds to wait between a max concurrent tasks check failure and the next check. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a max -> max?
| logWarning("The job requires to run a barrier stage that requires more slots than the " + | ||
| "total number of slots in the cluster currently.") | ||
| jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0) | ||
| val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it OK while this increment is not atomic?
In the following scenario, the value may not be correct
- We assume
jobIdToNumTasksCheckFailures(jobId) = 1 - Thread A executes L963, then
numCheckFailures = 2 - Thread B executes L963, then
numCheckFailures = 2 - Thread B executes L964 and L965, then
jobIdToNumTasksCheckFailures(jobId)has 2. - Thread A executes L964 and L965, then
jobIdToNumTasksCheckFailures(jobId)has 2.
Since two threads detected failure, we expect listener.jobFailed(e) is called. But, it is not called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Use atomic updates from ConcurrentHashMap. Update the counter and then check max failures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kiszk IIUC, there's exactly only one thread in eventLoop, so, the scenario mentioned above will not happen. And I even feel it is no need to use ConcurrentHashMap for jobIdToNumTasksCheckFailures at all. @jiangxb1987
|
Test build #94489 has finished for PR 22001 at commit
|
|
Test build #94493 has finished for PR 22001 at commit
|
|
Test build #94495 has finished for PR 22001 at commit
|
|
test this please |
|
|
||
| private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL = | ||
| ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval") | ||
| .doc("Time in seconds to wait between a max concurrent tasks check failure and the next " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"a ... failure"
| "to jobs that contain one or more barrier stages, we won't perform the check on " + | ||
| "non-barrier jobs.") | ||
| .timeConf(TimeUnit.SECONDS) | ||
| .createWithDefaultString("10s") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you make the default higher like 30s? This is to cover the case when applications starts immediately with a barrier while master is adding new executors. Let me know if this won't happen.
| // HadoopRDD whose underlying HDFS files have been deleted. | ||
| finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) | ||
| } catch { | ||
| case e: Exception if e.getMessage == |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
== -> .contains() in case the error message is nested
| } catch { | ||
| case e: Exception if e.getMessage == | ||
| DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER => | ||
| logWarning("The job requires to run a barrier stage that requires more slots than the " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please include jobId, stageId, request slots, and total slots in the log message.
| logWarning("The job requires to run a barrier stage that requires more slots than the " + | ||
| "total number of slots in the cluster currently.") | ||
| jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0) | ||
| val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Use atomic updates from ConcurrentHashMap. Update the counter and then check max failures.
| ) | ||
| return | ||
| } else { | ||
| listener.jobFailed(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you expect the same job submitted again? if not, we should remove the key from the hashmap.
| /** | ||
| * Number of max concurrent tasks check failures for each job. | ||
| */ | ||
| private[scheduler] val jobIdToNumTasksCheckFailures = new ConcurrentHashMap[Int, Int] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do entries in this map get cleaned?
| // Submit a job to trigger some tasks on active executors. | ||
| testSubmitJob(sc, rdd) | ||
|
|
||
| eventually(timeout(5.seconds)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe safer to let the task sleep longer and cancel the task one the conditions are met.
| } | ||
|
|
||
| override def maxNumConcurrentTasks(): Int = { | ||
| // TODO support this method for MesosFineGrainedSchedulerBackend |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
link to a JIRA
| "total number of slots in the cluster currently.") | ||
| jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0) | ||
| val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1 | ||
| if (numCheckFailures < DAGScheduler.DEFAULT_MAX_CONSECUTIVE_NUM_TASKS_CHECK_FAILURES) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should make DEFAULT_MAX_CONSECUTIVE_NUM_TASKS_CHECK_FAILURES configurable so users can specify unlimited retry if needed. Instead, we might want to fix the timeout since it is only relevant to cost.
|
Test build #94649 has finished for PR 22001 at commit
|
|
test this please |
|
Test build #94658 has finished for PR 22001 at commit
|
|
Test build #94672 has finished for PR 22001 at commit
|
|
retest this please |
|
Test build #94676 has finished for PR 22001 at commit
|
|
retest this please |
|
@shaneknapp Is the timeout due to concurrent workload on Jenkins workers? If so, shall we reduce the concurrency (more wait in the queue but more robust test result)? |
| * Check whether the barrier stage requires more slots (to be able to launch all tasks in the | ||
| * barrier stage together) than the total number of active slots currently. Fail current check | ||
| * if trying to submit a barrier stage that requires more slots than current total number. If | ||
| * the check fails consecutively for three times for a job, then fail current job submission. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems I do not find the code about "consecutively for three times", but only maxFailureNumTasksCheck ?
|
@mengxr it looks like the builds are just taking longer and longer. :( if this continues to be an issue, we'll need to bump the timeout in dev/run-tests-jenkins.py again. also, we JUST bumped the timeout ~20 days ago: |
|
Test build #94687 has finished for PR 22001 at commit
|
|
test this please |
|
@shaneknapp Maybe we could scan the test history and move some super stable tests to nightly. Apparently, it is not a solution for now. I'm giving another try:) |
|
@mengxr that is easier said than done... :) once the 2.4 cut is done, it might be time to have a discussion on the dev@ list about build strategies and how we should proceed w/PRB testing. |
|
test this please |
|
Test build #94705 has finished for PR 22001 at commit
|
|
retest this please |
|
Test build #94716 has finished for PR 22001 at commit
|
|
Just curious. It is very interesting to me since the recent three tries consistently cause a timeout failure at the same test. In addition, other PRs look successful without timeout. |
|
Test build #94721 has finished for PR 22001 at commit
|
|
@kiszk Thanks for the note! I reverted the change in DAGSchedulerSuite. Let's try Jenkins again. |
|
Test build #94754 has finished for PR 22001 at commit
|
|
Test build #94752 has finished for PR 22001 at commit
|
|
Test build #94801 has finished for PR 22001 at commit
|
|
LGTM. Merged into master. Thanks! |
What changes were proposed in this pull request?
We shall check whether the barrier stage requires more slots (to be able to launch all tasks in the barrier stage together) than the total number of active slots currently, and fail fast if trying to submit a barrier stage that requires more slots than current total number.
This PR proposes to add a new method
getNumSlots()to try to get the total number of currently active slots inSchedulerBackend, support of this new method has been added to all the first-class scheduler backends exceptMesosFineGrainedSchedulerBackend.How was this patch tested?
Added new test cases in
BarrierStageOnSubmittedSuite.