[SPARK-30246][CORE][SHUFFLE]fix spark external shuffle memory leak #27060
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
An app finished abnormal sometimes may cause shuffe service memory leak. In one of our production cases, the app failed for Stage cancelled as SparkContext has already shut down. the strange is there are still requests for fetch shuffle data and cause error in server side as below:
the client sie also show corresponding log like this:
in some cases, the request for
OpenBlocksis still on the fly. In the codeExternalShuffleBlockHandler#handleMessage, it will register aStreamStatetoOneForOneStreamManager#streams, then reply an success response to client unconditionally , the client receive the response and then fireChunkFetchRequestto fetch chunk, but at this time, the app has got eventAPPLICATION_STOPand executedExternalShuffleService#applicationRemovedmethod to clean the app'sExecutorShuffleInfo, this madeExecutor is not registerederror happended. even though when the client channel is closing, theTransportRequestHandler#channelInactivewas called to clean the StreamState with relate channel, but when cleanning theStreamState buffter, it also lookupManagedBufferwithappIdandexecIdinfo which have been cleaned in executors object. we can also find the log:StreamManager connectionTerminated() callback failedin NM's log file.so, when an
OpenBlocksrequest come, we should lookupExternalShuffleBlockResolver#executors, if the realted app is exited, we should not registering aStreamStateand just close the client (or reply an special message to client and in client side to handle it). and when an app getAPPLICATION_STOPto callapplicationRemoved, we should clean the the relatedstreamStatebeforeExecutorShuffleInfohas been cleaned, this is what the PR changes and prevents the shuffle service memory leak.Why are the changes needed?
The external shuffle service memory leak has a great impact on cluster with dynanic on and may cause NM crash.
Does this PR introduce any user-facing change?
No
How was this patch tested?
with existing ut