Skip to content

Commit 595e825

Browse files
ottercMridul Muralidharan
authored andcommitted
[SPARK-32922][SHUFFLE][CORE][FOLLOWUP] Fixes few issues when the executor tries to fetch push-merged blocks
### What changes were proposed in this pull request? Below 2 bugs were introduced with #32140 1. Instead of requesting the local-dirs for push-merged-local blocks from the ESS, `PushBasedFetchHelper` requests it from other executors. Push-based shuffle is only enabled when the ESS is enabled so it should always fetch the dirs from the ESS and not from other executors which is not yet supported. 2. The size of the push-merged blocks is logged incorrectly. ### Why are the changes needed? This fixes the above mentioned bugs and is needed for push-based shuffle to work properly. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Tested this by running an application on the cluster. The UTs mock the call `hostLocalDirManager.getHostLocalDirs` which is why didn't catch (1) with the UT. However, the fix is trivial and checking this in the UT will require a lot more effort so I haven't modified it in the UT. Logs of the executor with the bug ``` 21/07/15 15:42:46 WARN ExternalBlockStoreClient: Error while trying to get the host local dirs for [shuffle-push-merger] 21/07/15 15:42:46 WARN PushBasedFetchHelper: Error while fetching the merged dirs for push-merged-local blocks: shuffle_0_-1_13. Fetch the original blocks instead java.lang.RuntimeException: java.lang.IllegalStateException: Invalid executor id: shuffle-push-merger, expected 92. at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:130) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163) ``` After the fix, the executors were able to fetch the local push-merged blocks. Closes #33378 from otterc/SPARK-32922-followup. Authored-by: Chandni Singh <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 6d2cbad) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
1 parent d5022c3 commit 595e825

File tree

2 files changed

+12
-8
lines changed

2 files changed

+12
-8
lines changed

core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ import org.apache.spark.storage.ShuffleBlockFetcherIterator._
3636
* Helper class for [[ShuffleBlockFetcherIterator]] that encapsulates all the push-based
3737
* functionality to fetch push-merged block meta and shuffle chunks.
3838
* A push-merged block contains multiple shuffle chunks where each shuffle chunk contains multiple
39-
* shuffle blocks that belong to the common reduce partition and were merged by the ESS to that
40-
* chunk.
39+
* shuffle blocks that belong to the common reduce partition and were merged by the
40+
* external shuffle service to that chunk.
4141
*/
4242
private class PushBasedFetchHelper(
4343
private val iterator: ShuffleBlockFetcherIterator,
@@ -197,9 +197,13 @@ private class PushBasedFetchHelper(
197197
localShuffleMergerBlockMgrId)
198198
}
199199
} else {
200-
logDebug(s"Asynchronous fetch the push-merged-local blocks without cached merged dirs")
201-
hostLocalDirManager.getHostLocalDirs(localShuffleMergerBlockMgrId.host,
202-
localShuffleMergerBlockMgrId.port, Array(SHUFFLE_MERGER_IDENTIFIER)) {
200+
// Push-based shuffle is only enabled when the external shuffle service is enabled. If the
201+
// external shuffle service is not enabled, then there will not be any push-merged blocks
202+
// for the iterator to fetch.
203+
logDebug(s"Asynchronous fetch the push-merged-local blocks without cached merged " +
204+
s"dirs from the external shuffle service")
205+
hostLocalDirManager.getHostLocalDirs(blockManager.blockManagerId.host,
206+
blockManager.externalShuffleServicePort, Array(SHUFFLE_MERGER_IDENTIFIER)) {
203207
case Success(dirs) =>
204208
logDebug(s"Fetched merged dirs in " +
205209
s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms")

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ final class ShuffleBlockFetcherIterator(
386386
if (address.host == blockManager.blockManagerId.host) {
387387
numBlocksToFetch += blockInfos.size
388388
pushMergedLocalBlocks ++= blockInfos.map(_._1)
389-
pushMergedLocalBlockBytes += blockInfos.map(_._3).sum
389+
pushMergedLocalBlockBytes += blockInfos.map(_._2).sum
390390
} else {
391391
collectFetchRequests(address, blockInfos, collectedRemoteRequests)
392392
}
@@ -886,8 +886,8 @@ final class ShuffleBlockFetcherIterator(
886886
// blockId is a ShuffleBlockChunkId.
887887
// 2. Failure to read the push-merged-local meta. In this case, the blockId is
888888
// ShuffleBlockId.
889-
// 3. Failure to get the push-merged-local directories from the ESS. In this case, the
890-
// blockId is ShuffleBlockId.
889+
// 3. Failure to get the push-merged-local directories from the external shuffle service.
890+
// In this case, the blockId is ShuffleBlockId.
891891
if (pushBasedFetchHelper.isRemotePushMergedBlockAddress(address)) {
892892
numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1
893893
bytesInFlight -= size

0 commit comments

Comments
 (0)