-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-30623][Core] Spark external shuffle allow disable of separate event loop group #27665
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #118772 has finished for PR 27665 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we remove await, we see more SASL requests timing out. I mentioned this in the comment here:
#22173 (comment)
Wouldn't making the asyncMode default, make this problem worse?
In aysnc mode, how do you plan to tackle increased number of SASL failures?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comment @otterc!
This PR aims to fix the performance regression brings by the sync mode fetching. As you mention, you find a problem with the stress test framework, I think the default mode should be the guarantee for common use cases.
For the SASL requests timing out, maybe we need more context for your internal stress test framework and analyze the root cause. IMO, there should be other configs in spark.shuffle/netty could help, not only depends on the async/sync mode here.
|
So I actually filed SPARK-30623 for this, please update the title and things. I don't think we need a separate feature config for this, as that jira mentioned I think was just say if the config isn't explicitly set then do synchronous mode. |
|
@xuanyuanking we worked on the original fix of this issue. Having await in there is the key to the benefits provided in SPARK-24355, which improves reliability of Spark shuffle in a reasonable scaled deployment. This issue seems common across companies like us (LinkedIn), Netflix, Uber, Yahoo. As mentioned in #22173, what we observed is that in cases where HDD is used for shuffle storage, the disk is saturated first before the network can be saturated. So, for a reasonable scaled deployment, having this fix provides a boost in shuffle reliability without hurting much on the performance side. This is also validated by @tgravescs in the Yahoo deployment of this patch. It's reasonable to introduce another config that disables this reliability improvement if it leads to performance regression in certain deployment mode. Just want to see whether we should leave this enabled by default or not. Also, as mentioned in #22173, we have discovered a potential fix to this perf regression issue that does not removes its reliability benefits. It will take some extra time on our side to evaluate that fix, which is a fix inside Netty. Want to make sure the broader community knows what we have been doing for this issue, so we do not take away a potential reliability improvement to Spark. |
|
The default value of Policy-wise, we should not fix an issue while introducing a regression. I'm OK to have it since it's disabled by default and it does fix a common issue. What I'm asking for is to make sure the code path is exactly the same as before when this feature is disabled, so that there is no regression. +1 with @tgravescs to reuse the existing config. |
743e566 to
5015f60
Compare
|
Do some refactoring to reuse the logic of processing fetch requests. Reuse the config |
| */ | ||
| public boolean separateChunkFetchRequest() { | ||
| try { | ||
| conf.get("spark.shuffle.server.chunkFetchHandlerThreadsPercent"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no API to check conf existence?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, no API in ConfigProvider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0) > 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done in 4f42083.
| return 0; | ||
| } | ||
| int chunkFetchHandlerThreadsPercent = | ||
| conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's wrong with the previous code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to give a default value here, when it comes to here, the config must be set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by the config must be set, @xuanyuanking ? What value do you expect by default? Apparently, this seems to revert SPARK-25641 together without mentioning SPARK-25641. In the PR, only SPARK-24355 is mentioned.
No need to give a default value here, when it comes to here, the config must be set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because we only call this method if separateChunkFetchRequest returns true.
We will see exception if the assumption is broken.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR, we make the separate event loop group configurable by checking the config spark.shuffle.server.chunkFetchHandlerThreadsPercent is set or not.
What do you mean by the config must be set
Here the function chunkFetchHandlerThreads is only called while the config is set.
spark/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
Lines 124 to 129 in 0fe203e
| if (conf.getModuleName() != null && | |
| conf.getModuleName().equalsIgnoreCase("shuffle") && | |
| !isClientOnly && conf.separateChunkFetchRequest()) { | |
| chunkFetchWorkers = NettyUtils.createEventLoop( | |
| IOMode.valueOf(conf.ioMode()), | |
| conf.chunkFetchHandlerThreads(), |
What value do you expect by default? Apparently, this seems to revert SPARK-25641 together without mentioning SPARK-25641.
Yes, this PR makes the feature disabled by default, let me also mention SPARK-25641 in PR description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @xuanyuanking and @cloud-fan .
| private final TransportRequestHandler requestHandler; | ||
| private final long requestTimeoutNs; | ||
| private final boolean closeIdleConnections; | ||
| private final boolean separateChunkFetchRequest; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe a more explicit name: skipChunkFetchRequest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks
...n/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
Show resolved
Hide resolved
| TransportClient reverseClient, | ||
| RpcHandler rpcHandler, | ||
| Long maxChunksBeingTransferred) { | ||
| super(reverseClient, rpcHandler.getStreamManager(), maxChunksBeingTransferred); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need the maxChunksBeingTransferred variable in this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is still in use in processStreamRequest.
...n/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
Show resolved
Hide resolved
|
Test build #120190 has finished for PR 27665 at commit
|
|
Test build #120192 has finished for PR 27665 at commit
|
|
Test build #120196 has finished for PR 27665 at commit
|
|
Test build #120229 has finished for PR 27665 at commit
|
da8abd3 to
d4e0352
Compare
|
Test build #120274 has finished for PR 27665 at commit
|
|
|
||
| @Test | ||
| public void handleStreamRequest() { | ||
| public void handleStreamRequest() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this change necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because of the changes for TransportRequestHandler.handler here: https://github.com/apache/spark/pull/27665/files#diff-0e3429029d3f8d49e94ef11e4e3051a2R105
|
Test build #120273 has finished for PR 27665 at commit
|
|
retest this please |
|
Test build #120311 has finished for PR 27665 at commit
|
|
Test build #120310 has finished for PR 27665 at commit
|
|
retest this please |
|
Test build #120326 has finished for PR 27665 at commit
|
|
thanks, merging to master/3.0! |
|
Hi, @cloud-fan . This seems to be not in |
…event loop group ### What changes were proposed in this pull request? Fix the regression caused by #22173. The original PR changes the logic of handling `ChunkFetchReqeust` from async to sync, that's causes the shuffle benchmark regression. This PR fixes the regression back to the async mode by reusing the config `spark.shuffle.server.chunkFetchHandlerThreadsPercent`. When the user sets the config, ChunkFetchReqeust will be processed in a separate event loop group, otherwise, the code path is exactly the same as before. ### Why are the changes needed? Fix the shuffle performance regression described in #22173 (comment) ### Does this PR introduce any user-facing change? Yes, this PR disable the separate event loop for FetchRequest by default. ### How was this patch tested? Existing UT. Closes #27665 from xuanyuanking/SPARK-24355-follow. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 0fe203e) Signed-off-by: Wenchen Fan <[email protected]>
|
it is now. merge script runs slowly at my sides... |
|
Thank you! |
|
BTW, @xuanyuanking . |
|
Thanks for the review. |
…event loop group ### What changes were proposed in this pull request? Fix the regression caused by apache#22173. The original PR changes the logic of handling `ChunkFetchReqeust` from async to sync, that's causes the shuffle benchmark regression. This PR fixes the regression back to the async mode by reusing the config `spark.shuffle.server.chunkFetchHandlerThreadsPercent`. When the user sets the config, ChunkFetchReqeust will be processed in a separate event loop group, otherwise, the code path is exactly the same as before. ### Why are the changes needed? Fix the shuffle performance regression described in apache#22173 (comment) ### Does this PR introduce any user-facing change? Yes, this PR disable the separate event loop for FetchRequest by default. ### How was this patch tested? Existing UT. Closes apache#27665 from xuanyuanking/SPARK-24355-follow. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Fix the regression caused by #22173.
The original PR changes the logic of handling
ChunkFetchReqeustfrom async to sync, that's causes the shuffle benchmark regression. This PR fixes the regression back to the async mode by reusing the configspark.shuffle.server.chunkFetchHandlerThreadsPercent.When the user sets the config, ChunkFetchReqeust will be processed in a separate event loop group, otherwise, the code path is exactly the same as before.
As the creation of the separate event loop group is disabled by default, this PR also is a kind of revert for SPARK-25641.
Why are the changes needed?
Fix the shuffle performance regression described in #22173 (comment)
Does this PR introduce any user-facing change?
Yes, this PR disable the separate event loop for FetchRequest by default.
How was this patch tested?
Existing UT.