Skip to content

Commit ad89a02

Browse files
committed
Addressed review comments
1 parent c10b943 commit ad89a02

File tree

3 files changed

+44
-52
lines changed

3 files changed

+44
-52
lines changed

core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ private class PushBasedFetchHelper(
7171
}
7272

7373
/**
74-
* Returns true if the address is of a local push-merged block. false otherwise.
74+
* Returns true if the address is of a push-merged-local block. false otherwise.
7575
*/
7676
def isLocalPushMergedBlockAddress(address: BlockManagerId): Boolean = {
7777
isPushMergedShuffleBlockAddress(address) && address.host == blockManager.blockManagerId.host
@@ -190,14 +190,14 @@ private class PushBasedFetchHelper(
190190
val cachedPushedMergedDirs = hostLocalDirManager.getCachedHostLocalDirsFor(
191191
SHUFFLE_MERGER_IDENTIFIER)
192192
if (cachedPushedMergedDirs.isDefined) {
193-
logDebug(s"Fetch the local push-merged blocks with cached merged dirs: " +
193+
logDebug(s"Fetch the push-merged-local blocks with cached merged dirs: " +
194194
s"${cachedPushedMergedDirs.get.mkString(", ")}")
195195
pushMergedLocalBlocks.foreach { blockId =>
196196
fetchPushMergedLocalBlock(blockId, cachedPushedMergedDirs.get,
197197
localShuffleMergerBlockMgrId)
198198
}
199199
} else {
200-
logDebug(s"Asynchronous fetch the local push-merged blocks without cached merged dirs")
200+
logDebug(s"Asynchronous fetch the push-merged-local blocks without cached merged dirs")
201201
hostLocalDirManager.getHostLocalDirs(localShuffleMergerBlockMgrId.host,
202202
localShuffleMergerBlockMgrId.port, Array(SHUFFLE_MERGER_IDENTIFIER)) {
203203
case Success(dirs) =>
@@ -211,9 +211,9 @@ private class PushBasedFetchHelper(
211211
localShuffleMergerBlockMgrId)
212212
}
213213
case Failure(throwable) =>
214-
// If we see an exception with getting the local dirs for local push-merged blocks,
214+
// If we see an exception with getting the local dirs for push-merged-local blocks,
215215
// we fallback to fetch the original blocks. We do not report block fetch failure.
216-
logWarning(s"Error while fetching the merged dirs for local push-merged " +
216+
logWarning(s"Error while fetching the merged dirs for push-merged-local " +
217217
s"blocks: ${pushMergedLocalBlocks.mkString(", ")}. Fetch the original blocks instead",
218218
throwable)
219219
pushMergedLocalBlocks.foreach {
@@ -226,7 +226,7 @@ private class PushBasedFetchHelper(
226226
}
227227

228228
/**
229-
* Fetch a single local push-merged block generated. This can also be executed by the task thread
229+
* Fetch a single push-merged-local block generated. This can also be executed by the task thread
230230
* as well as the netty thread.
231231
* @param blockId ShuffleBlockId to be fetched
232232
* @param localDirs Local directories where the push-merged shuffle files are stored
@@ -244,10 +244,10 @@ private class PushBasedFetchHelper(
244244
localDirs))
245245
} catch {
246246
case e: Exception =>
247-
// If we see an exception with reading a local push-merged meta, we fallback to
247+
// If we see an exception with reading a push-merged-local meta, we fallback to
248248
// fetch the original blocks. We do not report block fetch failure
249249
// and will continue with the remaining local block read.
250-
logWarning(s"Error occurred while fetching local push-merged meta, " +
250+
logWarning(s"Error occurred while fetching push-merged-local meta, " +
251251
s"prepare to fetch the original blocks", e)
252252
iterator.addToResultsQueue(
253253
FallbackOnPushMergedFailureResult(blockId, blockManagerId, 0, isNetworkReqDone = false))
@@ -270,7 +270,7 @@ private class PushBasedFetchHelper(
270270
* finds more push-merged requests to remote and again updates it with additional requests for
271271
* original blocks.
272272
* The fallback happens when:
273-
* 1. There is an exception while creating shuffle chunks from local push-merged shuffle block.
273+
* 1. There is an exception while creating shuffle chunks from push-merged-local shuffle block.
274274
* See fetchLocalBlock.
275275
* 2. There is a failure when fetching remote shuffle chunks.
276276
* 3. There is a failure when processing SuccessFetchResult which is for a shuffle chunk
@@ -286,7 +286,7 @@ private class PushBasedFetchHelper(
286286
val fallbackBlocksByAddr: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] =
287287
blockId match {
288288
case shuffleBlockId: ShuffleBlockId =>
289-
iterator.incrementNumBlocksToFetch(-1)
289+
iterator.decreaseNumBlocksToFetch(1)
290290
mapOutputTracker.getMapSizesForMergeResult(
291291
shuffleBlockId.shuffleId, shuffleBlockId.reduceId)
292292
case _ =>
@@ -311,7 +311,7 @@ private class PushBasedFetchHelper(
311311
// These blocks were added to numBlocksToFetch so we increment numBlocksProcessed
312312
blocksProcessed += pendingShuffleChunks.size
313313
}
314-
iterator.incrementNumBlocksToFetch(-blocksProcessed)
314+
iterator.decreaseNumBlocksToFetch(blocksProcessed)
315315
mapOutputTracker.getMapSizesForMergeResult(
316316
shuffleChunkId.shuffleId, shuffleChunkId.reduceId, chunkBitmap)
317317
}

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 27 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ import org.apache.spark.util.{CompletionIterator, TaskCompletionListener, Utils}
6060
* Note that zero-sized blocks are already excluded, which happened in
6161
* [[org.apache.spark.MapOutputTracker.convertMapStatuses]].
6262
* @param mapOutputTracker [[MapOutputTracker]] for falling back to fetching the original blocks if
63-
* we fail to fetch shuffle chunks when push based shuffle is enabled.
63+
* we fail to fetch shuffle chunks when push based shuffle is enabled.
6464
* @param streamWrapper A function to wrap the returned input stream.
6565
* @param maxBytesInFlight max size (in bytes) of remote blocks to fetch at any given point.
6666
* @param maxReqsInFlight max number of remote requests to fetch blocks at any given point.
@@ -371,9 +371,9 @@ final class ShuffleBlockFetcherIterator(
371371
// blocks.Remote blocks are further split into FetchRequests of size at most maxBytesInFlight
372372
// in order to limit the amount of data in flight
373373
val collectedRemoteRequests = new ArrayBuffer[FetchRequest]
374-
val hostLocalBlocksCurrentIteration = mutable.LinkedHashSet[(BlockId, Int)]()
375374
var localBlockBytes = 0L
376375
var hostLocalBlockBytes = 0L
376+
var numHostLocalBlocks = 0
377377
var pushMergedLocalBlockBytes = 0L
378378
val prevNumBlocksToFetch = numBlocksToFetch
379379

@@ -404,7 +404,7 @@ final class ShuffleBlockFetcherIterator(
404404
val blocksForAddress =
405405
mergedBlockInfos.map(info => (info.blockId, info.size, info.mapIndex))
406406
hostLocalBlocksByExecutor += address -> blocksForAddress
407-
hostLocalBlocksCurrentIteration ++= blocksForAddress.map(info => (info._1, info._3))
407+
numHostLocalBlocks += blocksForAddress.size
408408
hostLocalBlockBytes += mergedBlockInfos.map(_.size).sum
409409
} else {
410410
val (_, timeCost) = Utils.timeTakenMs[Unit] {
@@ -419,21 +419,22 @@ final class ShuffleBlockFetcherIterator(
419419
pushMergedLocalBlockBytes
420420
val blocksToFetchCurrentIteration = numBlocksToFetch - prevNumBlocksToFetch
421421
assert(blocksToFetchCurrentIteration == localBlocks.size +
422-
hostLocalBlocksCurrentIteration.size + numRemoteBlocks + pushMergedLocalBlocks.size,
423-
s"The number of non-empty blocks $blocksToFetchCurrentIteration doesn't equal to " +
424-
s"the number of local blocks ${localBlocks.size} + " +
425-
s"the number of host-local blocks ${hostLocalBlocksCurrentIteration.size} " +
422+
numHostLocalBlocks + numRemoteBlocks + pushMergedLocalBlocks.size,
423+
s"The number of non-empty blocks $blocksToFetchCurrentIteration doesn't equal to the sum " +
424+
s"of the number of local blocks ${localBlocks.size} + " +
425+
s"the number of host-local blocks ${numHostLocalBlocks} " +
426426
s"the number of push-merged-local blocks ${pushMergedLocalBlocks.size} " +
427427
s"+ the number of remote blocks ${numRemoteBlocks} ")
428428
logInfo(s"Getting $blocksToFetchCurrentIteration " +
429429
s"(${Utils.bytesToString(totalBytes)}) non-empty blocks including " +
430430
s"${localBlocks.size} (${Utils.bytesToString(localBlockBytes)}) local and " +
431-
s"${hostLocalBlocksCurrentIteration.size} (${Utils.bytesToString(hostLocalBlockBytes)}) " +
431+
s"${numHostLocalBlocks} (${Utils.bytesToString(hostLocalBlockBytes)}) " +
432432
s"host-local and ${pushMergedLocalBlocks.size} " +
433433
s"(${Utils.bytesToString(pushMergedLocalBlockBytes)}) " +
434-
s"local push-merged and $numRemoteBlocks (${Utils.bytesToString(remoteBlockBytes)}) " +
434+
s"push-merged-local and $numRemoteBlocks (${Utils.bytesToString(remoteBlockBytes)}) " +
435435
s"remote blocks")
436-
this.hostLocalBlocks ++= hostLocalBlocksCurrentIteration
436+
this.hostLocalBlocks ++= hostLocalBlocksByExecutor.values
437+
.flatMap { infos => infos.map(info => (info._1, info._3)) }
437438
collectedRemoteRequests
438439
}
439440

@@ -883,9 +884,9 @@ final class ShuffleBlockFetcherIterator(
883884
// We get this result in 3 cases:
884885
// 1. Failure to fetch the data of a remote shuffle chunk. In this case, the
885886
// blockId is a ShuffleBlockChunkId.
886-
// 2. Failure to read the local push-merged meta. In this case, the blockId is
887+
// 2. Failure to read the push-merged-local meta. In this case, the blockId is
887888
// ShuffleBlockId.
888-
// 3. Failure to get the local push-merged directories from the ESS. In this case, the
889+
// 3. Failure to get the push-merged-local directories from the ESS. In this case, the
889890
// blockId is ShuffleBlockId.
890891
if (pushBasedFetchHelper.isRemotePushMergedBlockAddress(address)) {
891892
numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1
@@ -900,8 +901,8 @@ final class ShuffleBlockFetcherIterator(
900901
// a SuccessFetchResult or a FailureFetchResult.
901902
result = null
902903

903-
case PushMergedLocalMetaFetchResult(shuffleId, reduceId, bitmaps, localDirs, _) =>
904-
// Fetch local push-merged shuffle block data as multiple shuffle chunks
904+
case PushMergedLocalMetaFetchResult(shuffleId, reduceId, bitmaps, localDirs) =>
905+
// Fetch push-merged-local shuffle block data as multiple shuffle chunks
905906
val shuffleBlockId = ShuffleBlockId(shuffleId, SHUFFLE_PUSH_MAP_ID, reduceId)
906907
try {
907908
val bufs: Seq[ManagedBuffer] = blockManager.getLocalMergedBlockData(shuffleBlockId,
@@ -922,17 +923,17 @@ final class ShuffleBlockFetcherIterator(
922923
}
923924
} catch {
924925
case e: Exception =>
925-
// If we see an exception with reading local push-merged data, we fallback to
926-
// fetch the original blocks. We do not report block fetch failure
926+
// If we see an exception with reading push-merged-local index file, we fallback
927+
// to fetch the original blocks. We do not report block fetch failure
927928
// and will continue with the remaining local block read.
928-
logWarning(s"Error occurred while fetching local push-merged data, " +
929+
logWarning(s"Error occurred while reading push-merged-local index, " +
929930
s"prepare to fetch the original blocks", e)
930931
pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(
931932
shuffleBlockId, pushBasedFetchHelper.localShuffleMergerBlockMgrId)
932933
}
933934
result = null
934935

935-
case PushMergedRemoteMetaFetchResult(shuffleId, reduceId, blockSize, bitmaps, address, _) =>
936+
case PushMergedRemoteMetaFetchResult(shuffleId, reduceId, blockSize, bitmaps, address) =>
936937
// The original meta request is processed so we decrease numBlocksToFetch and
937938
// numBlocksInFlightPerAddress by 1. We will collect new shuffle chunks request and the
938939
// count of this is added to numBlocksToFetch in collectFetchReqsFromMergedBlocks.
@@ -946,7 +947,7 @@ final class ShuffleBlockFetcherIterator(
946947
// Set result to null to force another iteration.
947948
result = null
948949

949-
case PushMergedRemoteMetaFailedFetchResult(shuffleId, reduceId, address, _) =>
950+
case PushMergedRemoteMetaFailedFetchResult(shuffleId, reduceId, address) =>
950951
// The original meta request failed so we decrease numBlocksInFlightPerAddress by 1.
951952
numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1
952953
// If we fail to fetch the meta of a push-merged block, we fall back to fetching the
@@ -1071,8 +1072,8 @@ final class ShuffleBlockFetcherIterator(
10711072
results.put(result)
10721073
}
10731074

1074-
private[storage] def incrementNumBlocksToFetch(moreBlocksToFetch: Int): Unit = {
1075-
numBlocksToFetch += moreBlocksToFetch
1075+
private[storage] def decreaseNumBlocksToFetch(blocksFetched: Int): Unit = {
1076+
numBlocksToFetch -= blocksFetched
10761077
}
10771078

10781079
/**
@@ -1091,7 +1092,7 @@ final class ShuffleBlockFetcherIterator(
10911092
originalLocalBlocks, originalHostLocalBlocksByExecutor, originalMergedLocalBlocks)
10921093
// Add the remote requests into our queue in a random order
10931094
fetchRequests ++= Utils.randomize(originalRemoteReqs)
1094-
logInfo(s"Started ${originalRemoteReqs.size} fallback remote requests for push-merged")
1095+
logInfo(s"Created ${originalRemoteReqs.size} fallback remote requests for push-merged")
10951096
// fetch all the fallback blocks that are local.
10961097
fetchLocalBlocks(originalLocalBlocks)
10971098
// Merged local blocks should be empty during fallback
@@ -1246,12 +1247,6 @@ object ShuffleBlockFetcherIterator {
12461247
}
12471248
}
12481249

1249-
/**
1250-
* Dummy shuffle block id to fill into [[PushMergedRemoteMetaFetchResult]] and
1251-
* [[PushMergedRemoteMetaFailedFetchResult]], to match the [[FetchResult]] trait.
1252-
*/
1253-
private val DUMMY_SHUFFLE_BLOCK_ID = ShuffleBlockId(-1, -1, -1)
1254-
12551250
/**
12561251
* This function is used to merged blocks when doBatchFetch is true. Blocks which have the
12571252
* same `mapId` can be merged into one block batch. The block batch is specified by a range
@@ -1436,8 +1431,7 @@ object ShuffleBlockFetcherIterator {
14361431
reduceId: Int,
14371432
blockSize: Long,
14381433
bitmaps: Array[RoaringBitmap],
1439-
address: BlockManagerId,
1440-
blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
1434+
address: BlockManagerId) extends FetchResult
14411435

14421436
/**
14431437
* Result of a failure while fetching the meta information for a remote push-merged block.
@@ -1449,11 +1443,10 @@ object ShuffleBlockFetcherIterator {
14491443
private[storage] case class PushMergedRemoteMetaFailedFetchResult(
14501444
shuffleId: Int,
14511445
reduceId: Int,
1452-
address: BlockManagerId,
1453-
blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
1446+
address: BlockManagerId) extends FetchResult
14541447

14551448
/**
1456-
* Result of a successful fetch of meta information for a local push-merged block.
1449+
* Result of a successful fetch of meta information for a push-merged-local block.
14571450
*
14581451
* @param shuffleId shuffle id.
14591452
* @param reduceId reduce id.
@@ -1464,6 +1457,5 @@ object ShuffleBlockFetcherIterator {
14641457
shuffleId: Int,
14651458
reduceId: Int,
14661459
bitmaps: Array[RoaringBitmap],
1467-
localDirs: Array[String],
1468-
blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
1460+
localDirs: Array[String]) extends FetchResult
14691461
}

core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1263,7 +1263,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
12631263
assert(!iterator.hasNext)
12641264
}
12651265

1266-
test("SPARK-32922: failure to fetch local push-merged meta should fallback to fetch " +
1266+
test("SPARK-32922: failure to fetch push-merged-local meta should fallback to fetch " +
12671267
"original shuffle blocks") {
12681268
val blockManager = mock(classOf[BlockManager])
12691269
val localDirs = Array("testPath1", "testPath2")
@@ -1276,7 +1276,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
12761276
verifyLocalBlocksFromFallback(iterator)
12771277
}
12781278

1279-
test("SPARK-32922: failure to reading chunkBitmaps of local push-merged meta should " +
1279+
test("SPARK-32922: failure to reading chunkBitmaps of push-merged-local meta should " +
12801280
"fallback to original shuffle blocks") {
12811281
val blockManager = mock(classOf[BlockManager])
12821282
val localDirs = Array("local-dir")
@@ -1288,7 +1288,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
12881288
verifyLocalBlocksFromFallback(iterator)
12891289
}
12901290

1291-
test("SPARK-32922: failure to fetch local push-merged data should fallback to fetch " +
1291+
test("SPARK-32922: failure to fetch push-merged-local data should fallback to fetch " +
12921292
"original shuffle blocks") {
12931293
val blockManager = mock(classOf[BlockManager])
12941294
val localDirs = Array("testPath1", "testPath2")
@@ -1301,8 +1301,8 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
13011301
verifyLocalBlocksFromFallback(iterator)
13021302
}
13031303

1304-
test("SPARK-32922: failure to fetch local push-merged meta of a single merged block " +
1305-
"should not drop the fetch of other local push-merged blocks") {
1304+
test("SPARK-32922: failure to fetch push-merged-local meta of a single merged block " +
1305+
"should not drop the fetch of other push-merged-local blocks") {
13061306
val blockManager = mock(classOf[BlockManager])
13071307
val localDirs = Array("testPath1", "testPath2")
13081308
prepareForFallbackToLocalBlocks(
@@ -1375,7 +1375,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
13751375
intercept[FetchFailedException] { iterator.next() }
13761376
}
13771377

1378-
test("SPARK-32922: failure to fetch local push-merged block should fallback to fetch " +
1378+
test("SPARK-32922: failure to fetch push-merged-local block should fallback to fetch " +
13791379
"original shuffle blocks which contain host-local blocks") {
13801380
val blockManager = mock(classOf[BlockManager])
13811381
// BlockManagerId from another executor on the same host

0 commit comments

Comments
 (0)