-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23948] Trigger mapstage's job listener in submitMissingTasks #21019
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #89085 has finished for PR 21019 at commit
|
|
Jenkins, retest this please |
|
Test build #89097 has finished for PR 21019 at commit
|
|
Jenkins, retest this please |
|
Test build #89117 has finished for PR 21019 at commit
|
|
Jenkins, retest this please |
|
Test build #89165 has finished for PR 21019 at commit
|
|
@squito @vanzin @cloud-fan |
|
cc @jiangxb1987 |
|
I need to look more closely at the change, but your description of the problem makes sense. Can you also add a test case? |
|
@squito |
|
Test build #89264 has finished for PR 21019 at commit
|
jiangxb1987
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to a reasonable change, just some nits.
|
|
||
| private[scheduler] def markMapStageJobsAsFinished(shuffleStage: ShuffleMapStage): Unit = { | ||
| // Mark any map-stage jobs waiting on this stage as finished | ||
| if (shuffleStage.isAvailable && shuffleStage.mapStageJobs.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to double check that shuffleStage.isAvailable here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't seem this is necessary, as its already handled at the callsites ... but IMO its seems safer to include it, in case this gets invoked elsewhere in the future.
| Success, | ||
| makeMapStatus("hostD", rdd2.partitions.length))) | ||
| // stage1 listener still should not have a result, though there's no missing partitions | ||
| // in it. Because stage1 is not inside runningStages at this moment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Because stage1 has been failed and is not inside `runningStages` at this moment.
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, just some very minor comments.
|
|
||
| private[scheduler] def markMapStageJobsAsFinished(shuffleStage: ShuffleMapStage): Unit = { | ||
| // Mark any map-stage jobs waiting on this stage as finished | ||
| if (shuffleStage.isAvailable && shuffleStage.mapStageJobs.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't seem this is necessary, as its already handled at the callsites ... but IMO its seems safer to include it, in case this gets invoked elsewhere in the future.
| complete(taskSets(2), Seq( | ||
| (Success, makeMapStatus("hostC", rdd2.partitions.length)))) | ||
| assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === | ||
| HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can just use Set here
| // After stage0 is finished, stage1 will be submitted and found there is no missing | ||
| // partitions in it. Then listener got triggered. | ||
| assert(listener2.results.size === 1) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you also add assertDataStructuresEmpty() please? I know its not really related to your change but nice to include this in all the tests.
|
Thanks comments from Imran and Xingbo. |
|
Test build #89426 has finished for PR 21019 at commit
|
|
LGTM |
|
retest this please |
|
Test build #89437 has finished for PR 21019 at commit
|
|
merged to master, thanks! |
## What changes were proposed in this pull request? SparkContext submitted a map stage from `submitMapStage` to `DAGScheduler`, `markMapStageJobAsFinished` is called only in (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L933 and https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1314); But think about below scenario: 1. stage0 and stage1 are all `ShuffleMapStage` and stage1 depends on stage0; 2. We submit stage1 by `submitMapStage`; 3. When stage 1 running, `FetchFailed` happened, stage0 and stage1 got resubmitted as stage0_1 and stage1_1; 4. When stage0_1 running, speculated tasks in old stage1 come as succeeded, but stage1 is not inside `runningStages`. So even though all splits(including the speculated tasks) in stage1 succeeded, job listener in stage1 will not be called; 5. stage0_1 finished, stage1_1 starts running. When `submitMissingTasks`, there is no missing tasks. But in current code, job listener is not triggered. We should call the job listener for map stage in `5`. ## How was this patch tested? Not added yet. Author: jinxing <[email protected]> Closes apache#21019 from jinxing64/SPARK-23948. (cherry picked from commit 3990daa)
|
a few minutes after merging this I realized I should have also merged to branch 2.3. I don't see a way to do that without another PR. oops. I opened this, its a clean cherry-pick #21085 |
git cherry-pick -x -s && git push |
|
I guess I rely entirely on the merge script, but in these simple cases I should just do the push directly ... |
|
@squito @jiangxb1987 |
What changes were proposed in this pull request?
SparkContext submitted a map stage from
submitMapStagetoDAGScheduler,markMapStageJobAsFinishedis called only in (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L933 and https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1314);But think about below scenario:
ShuffleMapStageand stage1 depends on stage0;submitMapStage;FetchFailedhappened, stage0 and stage1 got resubmitted as stage0_1 and stage1_1;runningStages. So even though all splits(including the speculated tasks) in stage1 succeeded, job listener in stage1 will not be called;submitMissingTasks, there is no missing tasks. But in current code, job listener is not triggered.We should call the job listener for map stage in
5.How was this patch tested?
Not added yet.