-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32922][SHUFFLE][CORE] Adds support for executors to fetch local and remote merged shuffle data #32140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Adding @Victsm @mridulm @zhouyejoe |
|
Will resolve conflicts when this PR is merged as. |
|
I have rebased the changes against the latest master. It only depends on #32140 but it interfaces at just 1 place with that PR so it can still be reviewed in parallel. |
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: Most of this is as before. Have added conditions for shuffleChunks
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to reviewers: Both of these are created locally in the initialize and fetchFallbackBlocks. With push-based shuffle, partitionBlocksByFetchMode can be called multiple times because failure to fetch merged shuffle blocks/chunks (fallback) finds original blocks that made up the failed merged block/chunk and then we need to create new FetchRequests from these.
|
Gentle ping to help review this PR @tgravescs @attilapiros @Ngone51 @mridulm @Victsm @zhouyejoe |
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took an initial pass, yet to look at ShuffleBlockFetcherIterator or test suites.
I am wondering, given the volume, whether we want to split between ESS side and client side. Thoughts ?
common/network-common/src/main/java/org/apache/spark/network/client/BaseResponseCallback.java
Outdated
Show resolved
Hide resolved
...n/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
Outdated
Show resolved
Hide resolved
...n/network-common/src/main/java/org/apache/spark/network/protocol/MergedBlockMetaRequest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getNumBlocks makes this code cleaner.
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
Outdated
Show resolved
Hide resolved
...on/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
Outdated
Show resolved
Hide resolved
...on/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
Outdated
Show resolved
Hide resolved
...on/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
Outdated
Show resolved
Hide resolved
...shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlockChunks.java
Outdated
Show resolved
Hide resolved
Thanks Mridul for reviewing! That being said, I am still okay to break this change into client/sever PRs if that makes the review easier for the reviewers. |
…erIterator.scala Co-authored-by: Mridul Muralidharan <[email protected]>
|
@mridulm Resolved the conflict. |
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just had a minor nit - can you fix it pls ?
Will merge it once you are done.
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
|
Sorry for the delay. I'll do a review today. BTW, are there any other necessary magnet PRs that have to be merged for the 3.2 cut/release? |
There are 2 pending tasks which are necessary for Magnet:
|
|
The test failures are unrelated. |
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
| s"(${Utils.bytesToString(pushMergedLocalBlockBytes)}) " + | ||
| s"local push-merged and $numRemoteBlocks (${Utils.bytesToString(remoteBlockBytes)}) " + | ||
| s"remote blocks") | ||
| this.hostLocalBlocks ++= hostLocalBlocksCurrentIteration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we reuse hostLocalBlocksByExecutor here? e.g.,
this.hostLocalBlocks ++= hostLocalBlocksByExecutor.values
.flatMap { infos => infos.map(info => (info._1, info._3)) }so we can get rid of hostLocalBlocksCurrentIteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would need to do something for finding out the number of hostLocalBlocks in the assertions before as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have made this change but also added a var for counting num of hostLocalBlocks which is needed for the assertions. PTAL
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
|
I left some minor comments. I think we're ready to merge after addressing these comments. |
|
Test build #140388 has started for PR 32140 at commit |
|
Merged to master |
…utor tries to fetch push-merged blocks ### What changes were proposed in this pull request? Below 2 bugs were introduced with #32140 1. Instead of requesting the local-dirs for push-merged-local blocks from the ESS, `PushBasedFetchHelper` requests it from other executors. Push-based shuffle is only enabled when the ESS is enabled so it should always fetch the dirs from the ESS and not from other executors which is not yet supported. 2. The size of the push-merged blocks is logged incorrectly. ### Why are the changes needed? This fixes the above mentioned bugs and is needed for push-based shuffle to work properly. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Tested this by running an application on the cluster. The UTs mock the call `hostLocalDirManager.getHostLocalDirs` which is why didn't catch (1) with the UT. However, the fix is trivial and checking this in the UT will require a lot more effort so I haven't modified it in the UT. Logs of the executor with the bug ``` 21/07/15 15:42:46 WARN ExternalBlockStoreClient: Error while trying to get the host local dirs for [shuffle-push-merger] 21/07/15 15:42:46 WARN PushBasedFetchHelper: Error while fetching the merged dirs for push-merged-local blocks: shuffle_0_-1_13. Fetch the original blocks instead java.lang.RuntimeException: java.lang.IllegalStateException: Invalid executor id: shuffle-push-merger, expected 92. at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:130) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163) ``` After the fix, the executors were able to fetch the local push-merged blocks. Closes #33378 from otterc/SPARK-32922-followup. Authored-by: Chandni Singh <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…utor tries to fetch push-merged blocks ### What changes were proposed in this pull request? Below 2 bugs were introduced with #32140 1. Instead of requesting the local-dirs for push-merged-local blocks from the ESS, `PushBasedFetchHelper` requests it from other executors. Push-based shuffle is only enabled when the ESS is enabled so it should always fetch the dirs from the ESS and not from other executors which is not yet supported. 2. The size of the push-merged blocks is logged incorrectly. ### Why are the changes needed? This fixes the above mentioned bugs and is needed for push-based shuffle to work properly. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Tested this by running an application on the cluster. The UTs mock the call `hostLocalDirManager.getHostLocalDirs` which is why didn't catch (1) with the UT. However, the fix is trivial and checking this in the UT will require a lot more effort so I haven't modified it in the UT. Logs of the executor with the bug ``` 21/07/15 15:42:46 WARN ExternalBlockStoreClient: Error while trying to get the host local dirs for [shuffle-push-merger] 21/07/15 15:42:46 WARN PushBasedFetchHelper: Error while fetching the merged dirs for push-merged-local blocks: shuffle_0_-1_13. Fetch the original blocks instead java.lang.RuntimeException: java.lang.IllegalStateException: Invalid executor id: shuffle-push-merger, expected 92. at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:130) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163) ``` After the fix, the executors were able to fetch the local push-merged blocks. Closes #33378 from otterc/SPARK-32922-followup. Authored-by: Chandni Singh <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 6d2cbad) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…l and remote merged shuffle data ### What changes were proposed in this pull request? This is the shuffle fetch side change where executors can fetch local/remote push-merged shuffle data from shuffle services. This is needed for push-based shuffle - SPIP [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602). The change adds support to the `ShuffleBlockFetchIterator` to fetch push-merged block meta and shuffle chunks from local and remote ESS. If the fetch of any of these fails, then the iterator fallsback to fetch the original shuffle blocks that belonged to the push-merged block. ### Why are the changes needed? These changes are needed for push-based shuffle. Refer to the SPIP in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602). ### Does this PR introduce _any_ user-facing change? When push-based shuffle is turned on then that will fetch push-merged blocks from the remote shuffle service. The client logs will indicate this. ### How was this patch tested? Added unit tests. The reference PR with the consolidated changes covering the complete implementation is also provided in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602). We have already verified the functionality and the improved performance as documented in the SPIP doc. Lead-authored-by: Chandni Singh chsinghlinkedin.com Co-authored-by: Min Shen mshenlinkedin.com Co-authored-by: Ye Zhou yezhoulinkedin.com Closes apache#32140 from otterc/SPARK-32922. Lead-authored-by: Chandni Singh <[email protected]> Co-authored-by: Chandni Singh <[email protected]> Co-authored-by: Min Shen <[email protected]> Co-authored-by: otterc <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…l and remote merged shuffle data This is the shuffle fetch side change where executors can fetch local/remote push-merged shuffle data from shuffle services. This is needed for push-based shuffle - SPIP [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602). The change adds support to the `ShuffleBlockFetchIterator` to fetch push-merged block meta and shuffle chunks from local and remote ESS. If the fetch of any of these fails, then the iterator fallsback to fetch the original shuffle blocks that belonged to the push-merged block. These changes are needed for push-based shuffle. Refer to the SPIP in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602). When push-based shuffle is turned on then that will fetch push-merged blocks from the remote shuffle service. The client logs will indicate this. Added unit tests. The reference PR with the consolidated changes covering the complete implementation is also provided in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602). We have already verified the functionality and the improved performance as documented in the SPIP doc. Lead-authored-by: Chandni Singh chsinghlinkedin.com Co-authored-by: Min Shen mshenlinkedin.com Co-authored-by: Ye Zhou yezhoulinkedin.com Closes #32140 from otterc/SPARK-32922. Lead-authored-by: Chandni Singh <[email protected]> Co-authored-by: Chandni Singh <[email protected]> Co-authored-by: Min Shen <[email protected]> Co-authored-by: otterc <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…utor tries to fetch push-merged blocks ### What changes were proposed in this pull request? Below 2 bugs were introduced with #32140 1. Instead of requesting the local-dirs for push-merged-local blocks from the ESS, `PushBasedFetchHelper` requests it from other executors. Push-based shuffle is only enabled when the ESS is enabled so it should always fetch the dirs from the ESS and not from other executors which is not yet supported. 2. The size of the push-merged blocks is logged incorrectly. ### Why are the changes needed? This fixes the above mentioned bugs and is needed for push-based shuffle to work properly. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Tested this by running an application on the cluster. The UTs mock the call `hostLocalDirManager.getHostLocalDirs` which is why didn't catch (1) with the UT. However, the fix is trivial and checking this in the UT will require a lot more effort so I haven't modified it in the UT. Logs of the executor with the bug ``` 21/07/15 15:42:46 WARN ExternalBlockStoreClient: Error while trying to get the host local dirs for [shuffle-push-merger] 21/07/15 15:42:46 WARN PushBasedFetchHelper: Error while fetching the merged dirs for push-merged-local blocks: shuffle_0_-1_13. Fetch the original blocks instead java.lang.RuntimeException: java.lang.IllegalStateException: Invalid executor id: shuffle-push-merger, expected 92. at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:130) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163) ``` After the fix, the executors were able to fetch the local push-merged blocks. Closes #33378 from otterc/SPARK-32922-followup. Authored-by: Chandni Singh <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
What changes were proposed in this pull request?
This is the shuffle fetch side change where executors can fetch local/remote push-merged shuffle data from shuffle services. This is needed for push-based shuffle - SPIP SPARK-30602.
The change adds support to the
ShuffleBlockFetchIteratorto fetch push-merged block meta and shuffle chunks from local and remote ESS. If the fetch of any of these fails, then the iterator fallsback to fetch the original shuffle blocks that belonged to the push-merged block.Why are the changes needed?
These changes are needed for push-based shuffle. Refer to the SPIP in SPARK-30602.
Does this PR introduce any user-facing change?
When push-based shuffle is turned on then that will fetch push-merged blocks from the remote shuffle service. The client logs will indicate this.
How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602.
We have already verified the functionality and the improved performance as documented in the SPIP doc.
Lead-authored-by: Chandni Singh [email protected]
Co-authored-by: Min Shen [email protected]
Co-authored-by: Ye Zhou [email protected]