Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,12 @@ final class ShuffleBlockFetcherIterator(
}
}

// Shuffle remote blocks to disk when the request is too large.
// TODO: Encryption and compression should be considered.
// Fetch remote shuffle blocks to disk when the request is too large. Since the shuffle data is
// already encrypted and compressed over the wire(w.r.t. the related configs), we can just fetch
// the data and write it to file directly.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change is really good. Sorry for my ambiguous.

if (req.size > maxReqSizeShuffleToMem) {
val shuffleFiles = blockIds.map {
bId => blockManager.diskBlockManager.createTempLocalBlock()._2
val shuffleFiles = blockIds.map { _ =>
blockManager.diskBlockManager.createTempLocalBlock()._2
}.toArray
shuffleFilesSet ++= shuffleFiles
shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.shuffle.BlockFetchingListener
import org.apache.spark.network.util.LimitedInputStream
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.util.Utils


class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodTester {
Expand Down Expand Up @@ -420,9 +421,10 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
doReturn(localBmId).when(blockManager).blockManagerId

val diskBlockManager = mock(classOf[DiskBlockManager])
val tmpDir = Utils.createTempDir()
doReturn{
var blockId = new TempLocalBlockId(UUID.randomUUID())
(blockId, new File(blockId.name))
val blockId = TempLocalBlockId(UUID.randomUUID())
(blockId, new File(tmpDir, blockId.name))
}.when(diskBlockManager).createTempLocalBlock()
doReturn(diskBlockManager).when(blockManager).diskBlockManager

Expand All @@ -443,34 +445,34 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
}
})

def fetchShuffleBlock(blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])]): Unit = {
// Set `maxBytesInFlight` and `maxReqsInFlight` to `Int.MaxValue`, so that during the
// construction of `ShuffleBlockFetcherIterator`, all requests to fetch remote shuffle blocks
// are issued. The `maxReqSizeShuffleToMem` is hard-coded as 200 here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

new ShuffleBlockFetcherIterator(
TaskContext.empty(),
transfer,
blockManager,
blocksByAddress,
(_, in) => in,
maxBytesInFlight = Int.MaxValue,
maxReqsInFlight = Int.MaxValue,
maxReqSizeShuffleToMem = 200,
detectCorrupt = true)
}

val blocksByAddress1 = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 100L)).toSeq))
// Set maxReqSizeShuffleToMem to be 200.
val iterator1 = new ShuffleBlockFetcherIterator(
TaskContext.empty(),
transfer,
blockManager,
blocksByAddress1,
(_, in) => in,
Int.MaxValue,
Int.MaxValue,
200,
true)
fetchShuffleBlock(blocksByAddress1)
// `maxReqSizeShuffleToMem` is 200, which is greater than the block size 100, so don't fetch
// shuffle block to disk.
assert(shuffleFiles === null)

val blocksByAddress2 = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 300L)).toSeq))
// Set maxReqSizeShuffleToMem to be 200.
val iterator2 = new ShuffleBlockFetcherIterator(
TaskContext.empty(),
transfer,
blockManager,
blocksByAddress2,
(_, in) => in,
Int.MaxValue,
Int.MaxValue,
200,
true)
fetchShuffleBlock(blocksByAddress2)
// `maxReqSizeShuffleToMem` is 200, which is smaller than the block size 300, so fetch
// shuffle block to disk.
assert(shuffleFiles != null)
}
}