Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
df971da
Magnet shuffle service fetch block protocol
otterc May 9, 2020
041ca70
LIHADOOP-53321 Magnet: Merge client shuffle block fetcher related cha…
otterc May 11, 2020
6762de3
LIHADOOP-52494 Magnet fallback to origin shuffle blocks when fetch of…
otterc Jul 24, 2020
5bbe466
Changed the MergedBlockMetaRequest
otterc Dec 13, 2020
ff80579
empty commit from Min
Victsm Dec 14, 2020
ef464aa
Created PushBasedFetchHelper that encapsulates all pushbased function…
otterc May 27, 2021
2d9a98a
Fixing my review comments
otterc May 27, 2021
6fb6f16
More styling and nit fixes
otterc May 27, 2021
a1a0674
Addressed more of Mridul's comments
otterc Jun 4, 2021
fcde43d
Removed all the changes which are part of SPARK-35671
otterc Jun 8, 2021
cbc62fa
Addressed wuyi's comments
otterc Jun 9, 2021
c073a8e
Fixed indentation of PushBasedFetchHelper
otterc Jun 9, 2021
52b53a6
Adddressed Mridu's comments, accounting for meta requests in numBlock…
otterc Jun 9, 2021
9ae32fb
addressed wuyi's comments
otterc Jun 10, 2021
daf8d8e
Addressing Mridul's comments
otterc Jun 10, 2021
6e602af
Rebasing against master
otterc Jun 10, 2021
c698c37
Addressed Mridul's comments, rebased with master, and another UT
otterc Jun 10, 2021
429b759
Addressed Mridul's comments
otterc Jun 11, 2021
926f0b9
Added clarifying comments about chunksMetaMap concurrency
otterc Jun 11, 2021
94d7c5e
Fixed the line length
otterc Jun 11, 2021
67bd821
Addressed yi.wu's comments
otterc Jun 16, 2021
fc1b9f1
Update core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetch…
otterc Jun 23, 2021
390300a
Addressed Mridul's comments
otterc Jun 23, 2021
d53293e
Removed passing numChunks
otterc Jun 24, 2021
69fcc23
Rebased against apache master
otterc Jun 28, 2021
c10b943
Addressed Mridul's comments
otterc Jun 29, 2021
ad89a02
Addressed review comments
otterc Jun 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1453,7 +1453,8 @@ private[spark] object MapOutputTracker extends Logging {
// ShuffleBlockId with mapId being SHUFFLE_PUSH_MAP_ID to indicate this is
// a merged shuffle block.
splitsByAddress.getOrElseUpdate(mergeStatus.location, ListBuffer()) +=
((ShuffleBlockId(shuffleId, SHUFFLE_PUSH_MAP_ID, partId), mergeStatus.totalSize, -1))
((ShuffleBlockId(shuffleId, SHUFFLE_PUSH_MAP_ID, partId), mergeStatus.totalSize,
SHUFFLE_PUSH_MAP_ID))
// For the "holes" in this pre-merged shuffle partition, i.e., unmerged mapper
// shuffle partition blocks, fetch the original map produced shuffle partition blocks
val mapStatusesWithIndex = mapStatuses.zipWithIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ private[spark] class SerializerManager(
private def shouldCompress(blockId: BlockId): Boolean = {
blockId match {
case _: ShuffleBlockId => compressShuffle
case _: ShuffleBlockChunkId => compressShuffle
case _: BroadcastBlockId => compressBroadcast
case _: RDDBlockId => compressRdds
case _: TempLocalBlockId => compressShuffleSpill
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
readMetrics: ShuffleReadMetricsReporter,
serializerManager: SerializerManager = SparkEnv.get.serializerManager,
blockManager: BlockManager = SparkEnv.get.blockManager,
mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker,
shouldBatchFetch: Boolean = false)
extends ShuffleReader[K, C] with Logging {

Expand Down Expand Up @@ -71,6 +72,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
context,
blockManager.blockStoreClient,
blockManager,
mapOutputTracker,
blocksByAddress,
serializerManager.wrapStream,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
Expand Down
15 changes: 14 additions & 1 deletion core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ sealed abstract class BlockId {
(isInstanceOf[ShuffleBlockId] || isInstanceOf[ShuffleBlockBatchId] ||
isInstanceOf[ShuffleDataBlockId] || isInstanceOf[ShuffleIndexBlockId])
}
def isShuffleChunk: Boolean = isInstanceOf[ShuffleBlockChunkId]
def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]

override def toString: String = name
Expand Down Expand Up @@ -72,6 +73,15 @@ case class ShuffleBlockBatchId(
}
}

@Since("3.2.0")
@DeveloperApi
case class ShuffleBlockChunkId(
shuffleId: Int,
reduceId: Int,
chunkId: Int) extends BlockId {
override def name: String = "shuffleChunk_" + shuffleId + "_" + reduceId + "_" + chunkId
}

@DeveloperApi
case class ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
Expand Down Expand Up @@ -152,14 +162,15 @@ class UnrecognizedBlockId(name: String)
@DeveloperApi
object BlockId {
val RDD = "rdd_([0-9]+)_([0-9]+)".r
val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
val SHUFFLE = "shuffle_([0-9]+)_(-?[0-9]+)_([0-9]+)".r
val SHUFFLE_BATCH = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)_([0-9]+)".r
val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r
val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r
val SHUFFLE_PUSH = "shufflePush_([0-9]+)_([0-9]+)_([0-9]+)".r
val SHUFFLE_MERGED_DATA = "shuffleMerged_([_A-Za-z0-9]*)_([0-9]+)_([0-9]+).data".r
val SHUFFLE_MERGED_INDEX = "shuffleMerged_([_A-Za-z0-9]*)_([0-9]+)_([0-9]+).index".r
val SHUFFLE_MERGED_META = "shuffleMerged_([_A-Za-z0-9]*)_([0-9]+)_([0-9]+).meta".r
val SHUFFLE_CHUNK = "shuffleChunk_([0-9]+)_([0-9]+)_([0-9]+)".r
val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
val TASKRESULT = "taskresult_([0-9]+)".r
val STREAM = "input-([0-9]+)-([0-9]+)".r
Expand All @@ -186,6 +197,8 @@ object BlockId {
ShuffleMergedIndexBlockId(appId, shuffleId.toInt, reduceId.toInt)
case SHUFFLE_MERGED_META(appId, shuffleId, reduceId) =>
ShuffleMergedMetaBlockId(appId, shuffleId.toInt, reduceId.toInt)
case SHUFFLE_CHUNK(shuffleId, reduceId, chunkId) =>
ShuffleBlockChunkId(shuffleId.toInt, reduceId.toInt, chunkId.toInt)
case BROADCAST(broadcastId, field) =>
BroadcastBlockId(broadcastId.toLong, field.stripPrefix("_"))
case TASKRESULT(taskId) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ private[spark] class HostLocalDirManager(
executorIdToLocalDirsCache.asMap().asScala.toMap
}

private[spark] def getCachedHostLocalDirsFor(executorId: String): Option[Array[String]] =
executorIdToLocalDirsCache.synchronized {
Option(executorIdToLocalDirsCache.getIfPresent(executorId))
}

private[spark] def getHostLocalDirs(
host: String,
port: Int,
Expand Down
Loading