Skip to content

Conversation

@012huang
Copy link
Contributor

What changes were proposed in this pull request?

An app terminated abnormal sometimes may cause shuffe service memory leak. In one of our production cases, the app failed for Stage cancelled as SparkContext has already shut down. the strange is there are still requests for fetch shuffle data and cause error in server side as below:

2019-12-08 22:23:33,375 ERROR server.TransportRequestHandler (TransportRequestHandler.java:processFetchRequest(132)) - Error opening block StreamChunkId{streamId=1902064894814, chunkIndex=0} for request from /10.221.115.175:38582
java.lang.RuntimeException: Executor is not registered (appId=application_1574499669561_954327, execId=4514)

the client sie also show corresponding log like this:

org.apache.spark.shuffle.FetchFailedException: Failure while fetching StreamChunkId{streamId=1902064894814, chunkIndex=0}: java.lang.RuntimeException: Executor is not registered (appId=application_1574499669561_954327, execId=4514)

in some cases, the request for OpenBlocks or FetchShuffleBlocks is still on the fly. In the code ExternalBlockHandler#handleMessage, it will register a StreamState to OneForOneStreamManager#streams, then reply an success response to client unconditionally , the client receive the response and then fire ChunkFetchRequest to fetch chunk, but at this time, the app has got event APPLICATION_STOP and executed ExternalShuffleService#applicationRemoved method to clean the app's ExecutorShuffleInfo, this made Executor is not registered error happended. even though when the client channel is closing, the TransportRequestHandler#channelInactive was called to clean the StreamState with relate channel, but when cleanning the StreamState buffter, it also lookup ManagedBuffer with appId and execId info which have been cleaned in executors object. we can also find the log: StreamManager connectionTerminated() callback failed in NM's log file.

so, when an OpenBlocks request come, we should lookup ExternalShuffleBlockResolver#executors , if the realted app is exited, we should not registering a StreamState then just close the client and reply a faild response.
In addition, when an app get APPLICATION_STOP to call applicationRemoved, we should clean the the related streamState before ExecutorShuffleInfo has been cleaned, this is what the PR changes and prevents the shuffle service memory leak.

Why are the changes needed?

The external shuffle service memory leak has a great impact on cluster with dynanic on and may cause NM crash.

Does this PR introduce any user-facing change?

No

How was this patch tested?

add ut

@012huang 012huang changed the title fix spark external shuffle service memory leak [SPARK-30246][CORE][SHUFFLE] fix spark external shuffle service memory leak Jan 14, 2020
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@012huang
Copy link
Contributor Author

cc. @viirya can you help review this? thanks

@vanzin
Copy link
Contributor

vanzin commented Jan 15, 2020

Your fix is very different, but from your explanation in the PR, it sounds like #27064 will also fix your problem?

@viirya
Copy link
Member

viirya commented Jan 15, 2020

Yea, for memory leak, I think #27064 should fix it.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jan 17, 2020

According to the above discussion, I'll close this PR because #27064 is merged.
Thank you, @012huang and all!

@012huang
Copy link
Contributor Author

ok, thank you all!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants