-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-6737] Fix memory leak in OutputCommitCoordinator #5397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #29803 has started for PR 5397 at commit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be better to put these three lines (at least) in a separate method:
private def endStage(stage: Stage): Unit = {
stage.latestInfo.stageFailed(failureReason)
outputCommitCoordinator.stageEnd(stage.id)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
}
That would make it harder to miss things like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually looks like handleTaskCompletion has a nested markStageAsFinished method that looks like it should do this. That method also removes the stage from runningStages, which doesn't appear to happen in all of the paths where we post SparkListenerStageCompleted. Let me take a look at this and see whether there's a safe way to refactor things to use this method.
|
@vanzin, I've refactored this code to extract the stage completion code into a new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was redundant, since markStagesAsFinished removes the stage from runningStages.
|
Test build #29806 has started for PR 5397 at commit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a slight change in behavior here: in the old code, we never removed the stage from runningStages here even though we posted a StageCompletion listener event. I think this was probably a bug. In this updated code, markStagesAsFinished will remove the stage from runningStages.
|
LGTM. |
|
Test build #29803 has finished for PR 5397 at commit
|
|
Test PASSed. |
|
Test build #29806 has finished for PR 5397 at commit
|
|
Test PASSed. |
|
LGTM too. |
|
Thanks for reviewing. I'm going to merge this into master and open a separate PR to backport to branch-1.3. |
|
Actually, it appears that the merge conflicts with |
This patch fixes a memory leak in the DAGScheduler, which caused us to leak a map entry per submitted stage. The problem is that the OutputCommitCoordinator needs to be informed when stages end in order to remove entries from its `authorizedCommitters` map, but the DAGScheduler only called it in one of the four code paths that are used to mark stages as completed. This patch fixes this issue by consolidating the processing of stage completion into a new `markStageAsFinished` method and updates DAGSchedulerSuite's `assertDataStructuresEmpty` assertion to also check the OutputCommitCoordinator data structures. I've also added a comment at the top of DAGScheduler so that we remember to update this test when adding new data structures. Author: Josh Rosen <[email protected]> Closes #5397 from JoshRosen/SPARK-6737 and squashes the following commits: af3b02f [Josh Rosen] Consolidate stage completion handling code in a single method. e96ce3a [Josh Rosen] Consolidate stage completion handling code in a single method. 3052aea [Josh Rosen] Comment update 7896899 [Josh Rosen] Fix SPARK-6737 by informing OutputCommitCoordinator of all stage end events. 4ead1dc [Josh Rosen] Add regression tests for SPARK-6737 (cherry picked from commit c83e039) Signed-off-by: Josh Rosen <[email protected]> Conflicts: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
This patch fixes a memory leak in the DAGScheduler, which caused us to leak a map entry per submitted stage. The problem is that the OutputCommitCoordinator needs to be informed when stages end in order to remove entries from its
authorizedCommittersmap, but the DAGScheduler only called it in one of the four code paths that are used to mark stages as completed.This patch fixes this issue by consolidating the processing of stage completion into a new
markStageAsFinishedmethod and updates DAGSchedulerSuite'sassertDataStructuresEmptyassertion to also check the OutputCommitCoordinator data structures. I've also added a comment at the top of DAGScheduler so that we remember to update this test when adding new data structures.