-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29551][CORE] Fix a bug about fetch failed when an executor is lost #26206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@cloud-fan @gatorsmile Kindly review, thanks. |
|
Can one of the admins verify this patch? |
1607bb4 to
78bf757
Compare
|
Can you explain what is the regression? I have no idea what is the problem you are trying to fix. |
|
When an executor lost with some reason and some things (eg:. the external shuffle service or host lost on the executor's host.) happened, and the executor loses time happens to be reduce stage As we all know that the previous will call So we should distinguish the failedEpoch of 'executor lost' from the fetchFailedEpoch of 'fetch failed' to solve the above problem. |
…lost
- There will be a regression when the executor lost and then causes 'fetch failed'.
- Add fetchFailedEpoch to solve the above problem.
78bf757 to
e1574fe
Compare
|
can you please add the description to the first message. Please change the title to be what is the description of change. Please specify if this is a regression from spark 2.4 or if you are proposing a new feature. You talk about the specific code in mapoutputtracker but can you please say what the user sees - does the job fail, does it hang, etc.,. fetch failed is a large category. |
|
Yea, please describe the problem from an end-user's perspective. |
| // TODO: Garbage collect information about failure epochs when we know there are no more | ||
| // stray messages to detect. | ||
| private val failedEpoch = new HashMap[String, Long] | ||
| // There will be a regression when an executor lost and then causes 'fetch failed'. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There will be a regression is misleading because this mean this PR causes a regression.
| assert(mapStatus2(2).location.host === "hostB") | ||
| } | ||
|
|
||
| test("All shuffle files on the executor should be cleaned up when executor lost " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the bug fix PR, we had better add SPARK-29551 prefix for the test case name.
| assert(initialMapStatus1.map{_.mapId}.toSet === Set(5, 6, 7)) | ||
|
|
||
| val initialMapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses | ||
| // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's revert this change. This kind clean up is good in general of course, but this makes the backport difficult.
| afterEach() | ||
| val conf = new SparkConf() | ||
| conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true") | ||
| conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. "spark.files.fetchFailure.unRegisterOutputOnHost" -> UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE.key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| )) | ||
| // make sure our test setup is correct | ||
| val initialMapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses | ||
| // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we remove this commented code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| assert(mapStatus2(2).location.executorId === "exec-hostB") | ||
| assert(mapStatus2(2).location.host === "hostB") | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indentation seems to be corrupted in some point before this line.
| } | ||
| } | ||
|
|
||
| test("All shuffle files on the host should be cleaned up when host lost") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same comments for this test case (test case name, use config key instead of string literal, indentation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @weixiuli .
According to the previous comment, you need to keep the PR description up-to-date.
For example, #26206 (comment) should be in the PR description because the PR description will be a commit log eventually.
Can you fill in this section? If it's a bug fix, how does the bug affect end users? Or it's only for performance? |
|
What problem are you trying to resolve here ?If some jobs failed, can you please attach the related logs in the JIRA ? |
|
Some more stages of a job will be submitted and the job finish very slowly without this PR when the above happens . The user-affecting which I have described in the section. |
|
Seems got your point, for fetch fail, you want it don't remove executor, if remove it, we need to re compute stage. In you pr, you allow it retry in task level, not just remove executor. But what confused me is that, fetch data have retry mechanism ,we have add some pr to fix fetch failed problem in lower level such as : Do we need to do that in this place.? |
|
Disclaimer: I haven't read this thread in detail yet, so maybe this was already discussed. Is this PR related to the problems I describe in SPARK-27736 "Improve handling of FetchFailures caused by ExternalShuffleService losing track of executor registrations"? |
|
@joshrosen-stripe Yeah, the PR may solve the problem which has been discussed in SPARK-27736. |
|
@weixiuli can you describe the problem and your fix at high-level? i.e. do not mention the detailed code.
This is pretty hard to understand, please use phrases like "clear shuffle status for a host", "clear shuffle status for a mapper", etc. |
Just find this PR. We met the issue described by SPARK-27736 recently. And for some other reasons, we don't enable nodeManager recovery. |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
When an executor lost with some reason and some things (e.g. the external shuffle service or host lost on the executor's host.) happened, and the executor loses time happens to be reduce stage fetch failed from it which is really bad, the previous only call
mapOutputTracker.unregisterMapOutput(shuffleId, mapIndex, bmAddress)to mark clear shuffle status for the mapper at this time , but the other mappers shuffle status on the executor are also unnavailable and the DagScheduler does Not know that, so the reduce stages will fetch failed again when fetch them, the unavailable shuffle status can only be resubmitted by a nest retry stage which is the regression.As we all know that the previous will call
mapOutputTracker.removeOutputsOnHost(host)to mark clear shuffle status on the host ormapOutputTracker.removeOutputsOnExecutor(execId)to mark clear shuffle status on the executor when reduce stage fetches failed and the executor is active, while it does NOT nothing when the executor lost happened, which is really bad .So we should distinguish the failedEpoch of 'executor lost' from the fetchFailedEpoch of 'fetch failed' to solve the above problem.
Why are the changes needed?
The regression has been described above.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Add unittests.