Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.network

import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.storage.{BlockManagerId, BlockId, StorageLevel}

private[spark]
trait BlockDataManager {
Expand All @@ -29,6 +29,12 @@ trait BlockDataManager {
*/
def getBlockData(blockId: BlockId): ManagedBuffer

/**
* Interface to get other executor's block data as the same node as blockManagerId. Throws
* an exception if the block cannot be found or cannot be read successfully.
*/
def getBlockData(blockId: BlockId, blockManagerId: BlockManagerId): ManagedBuffer

/**
* Put the block locally, using the given storage level.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class FileShuffleBlockManager(conf: SparkConf)
val fileId = shuffleState.nextFileId.getAndIncrement()
val files = Array.tabulate[File](numBuckets) { bucketId =>
val filename = physicalFileName(shuffleId, bucketId, fileId)
blockManager.diskBlockManager.getFile(filename)
blockManager.diskBlockManager.getFile(filename, blockManager.blockManagerId)
}
val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files)
shuffleState.allFileGroups.add(fileGroup)
Expand All @@ -180,7 +180,8 @@ class FileShuffleBlockManager(conf: SparkConf)
Some(segment.nioByteBuffer())
}

override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
override def getBlockData(blockId: ShuffleBlockId,
blockManagerId: BlockManagerId = blockManager.blockManagerId): ManagedBuffer = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this patch only supports SortShuffleManager?
ISTM the same logic could be straightforwardly implemented in HashShuffleManager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It not supports HashShuffle with consolidateShuffleFiles. For not confuse,it only support SortShuffleManager.

if (consolidateShuffleFiles) {
// Search all file groups associated with this shuffle.
val shuffleState = shuffleStates(blockId.shuffleId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,16 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
ShuffleBlockId(shuffleId, mapId, 0)
}

def getDataFile(shuffleId: Int, mapId: Int): File = {
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0))
def getDataFile(shuffleId: Int,
mapId: Int,
blockManagerId: BlockManagerId = blockManager.blockManagerId): File = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @viper-kun the style here and other places should be:

def getDataFile(
    shuffleId: Int,
    mapId: Int,
    blockManagerId: BlockManagerId = ...): File = {
  ...
}

blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0), blockManagerId)
}

private def getIndexFile(shuffleId: Int, mapId: Int): File = {
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0))
private def getIndexFile(shuffleId: Int,
mapId: Int,
blockManagerId: BlockManagerId = blockManager.blockManagerId): File = {
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0), blockManagerId)
}

/**
Expand Down Expand Up @@ -101,10 +105,11 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
Some(getBlockData(blockId).nioByteBuffer())
}

override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
override def getBlockData(blockId: ShuffleBlockId,
blockManagerId: BlockManagerId = blockManager.blockManagerId): ManagedBuffer = {
// The block is actually going to be a range of a single map output file for this map, so
// find out the consolidated file, then the offset within that from our index
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId, blockManagerId)

val in = new DataInputStream(new FileInputStream(indexFile))
try {
Expand All @@ -113,7 +118,7 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
val nextOffset = in.readLong()
new FileSegmentManagedBuffer(
transportConf,
getDataFile(blockId.shuffleId, blockId.mapId),
getDataFile(blockId.shuffleId, blockId.mapId, blockManagerId),
offset,
nextOffset - offset)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.shuffle

import java.nio.ByteBuffer
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.storage.{BlockManagerId, ShuffleBlockId}

private[spark]
trait ShuffleBlockManager {
Expand All @@ -31,7 +31,7 @@ trait ShuffleBlockManager {
*/
def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer]

def getBlockData(blockId: ShuffleBlockId): ManagedBuffer
def getBlockData(blockId: ShuffleBlockId, blockManagerId: BlockManagerId): ManagedBuffer

def stop(): Unit
}
19 changes: 13 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ private[spark] class BlockManager(
blockManagerId
}

master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
master.registerBlockManager(
blockManagerId, maxMemory, slaveActor, diskBlockManager.getLocalDirsPath())

// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
Expand Down Expand Up @@ -265,7 +266,8 @@ private[spark] class BlockManager(
def reregister(): Unit = {
// TODO: We might need to rate limit re-registering.
logInfo("BlockManager re-registering with master")
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
master.registerBlockManager(
blockManagerId, maxMemory, slaveActor, diskBlockManager.getLocalDirsPath())
reportAllBlocks()
}

Expand Down Expand Up @@ -295,13 +297,18 @@ private[spark] class BlockManager(
}
}

override def getBlockData(blockId: BlockId): ManagedBuffer = {
getBlockData(blockId, blockManagerId)
}

/**
* Interface to get local block data. Throws an exception if the block cannot be found or
* cannot be read successfully.
* Interface to get other executor's block data as the same node as blockManagerId.
* Throws an exception if the block cannot be found or cannot be read successfully.
*/
override def getBlockData(blockId: BlockId): ManagedBuffer = {
override def getBlockData(blockId: BlockId, blockManagerId: BlockManagerId): ManagedBuffer = {
if (blockId.isShuffle) {
shuffleManager.shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
shuffleManager.shuffleBlockManager.getBlockData(
blockId.asInstanceOf[ShuffleBlockId], blockManagerId)
} else {
val blockBytesOpt = doGetLocal(blockId, asBlockResult = false)
.asInstanceOf[Option[ByteBuffer]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ class BlockManagerMaster(
}

/** Register the BlockManager's id with the driver. */
def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
def registerBlockManager(blockManagerId: BlockManagerId,
maxMemSize: Long,
slaveActor: ActorRef,
localDirs: Array[String]) {
logInfo("Trying to register BlockManager")
tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor, localDirs))
logInfo("Registered BlockManager")
}

Expand All @@ -75,6 +78,11 @@ class BlockManagerMaster(
askDriverWithReply[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
}

/** Return other blockmanager's local dirs on the same machine as blockManagerId */
def getLocalDirsPath(blockManagerId: BlockManagerId): Map[BlockManagerId, Array[String]] = {
askDriverWithReply[Map[BlockManagerId, Array[String]]](GetLocalDirsPath(blockManagerId))
}

/**
* Check if block manager master has a block. Note that this can be used to check for only
* those blocks that are reported to block manager master.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
private val akkaTimeout = AkkaUtils.askTimeout(conf)

override def receiveWithLogging: PartialFunction[Any, Unit] = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
register(blockManagerId, maxMemSize, slaveActor)
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor, localDirsPath) =>
register(blockManagerId, maxMemSize, slaveActor, localDirsPath)
sender ! true

case UpdateBlockInfo(
Expand All @@ -77,6 +77,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
case GetMemoryStatus =>
sender ! memoryStatus

case GetLocalDirsPath(blockManagerId) =>
sender ! getLocalDirsPath(blockManagerId)

case GetStorageStatus =>
sender ! storageStatus

Expand Down Expand Up @@ -223,6 +226,15 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}
}

// Return local dirs of other blockmanager on the same machine as blockManagerId
private def getLocalDirsPath(
blockManagerId: BlockManagerId): Map[BlockManagerId, Array[String]] = {
blockManagerInfo
.filter { case(id, _) => (id != blockManagerId && id.host == blockManagerId.host)}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Unnecessary parentheses.

.mapValues { info => info.localDirsPath }
.toMap
}

// Return a map from the block manager id to max memory and remaining memory.
private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
blockManagerInfo.map { case(blockManagerId, info) =>
Expand Down Expand Up @@ -291,7 +303,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
).map(_.flatten.toSeq)
}

private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
private def register(
id: BlockManagerId,
maxMemSize: Long,
slaveActor: ActorRef, localDirsPath: Array[String]) {
val time = System.currentTimeMillis()
if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
Expand All @@ -308,7 +323,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
blockManagerIdByExecutor(id.executorId) = id

blockManagerInfo(id) = new BlockManagerInfo(
id, System.currentTimeMillis(), maxMemSize, slaveActor)
id, System.currentTimeMillis(), maxMemSize, slaveActor, localDirsPath)
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
}
Expand All @@ -320,7 +335,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
memSize: Long,
diskSize: Long,
tachyonSize: Long): Boolean = {

if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
// We intentionally do not register the master (except in local mode),
Expand Down Expand Up @@ -412,7 +426,8 @@ private[spark] class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
val maxMem: Long,
val slaveActor: ActorRef)
val slaveActor: ActorRef,
val localDirsPath: Array[String])
extends Logging {

private var _lastSeenMs: Long = timeMs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ private[spark] object BlockManagerMessages {
case class RegisterBlockManager(
blockManagerId: BlockManagerId,
maxMemSize: Long,
sender: ActorRef)
sender: ActorRef,
subDirs: Array[String])
extends ToBlockManagerMaster

case class UpdateBlockInfo(
Expand Down Expand Up @@ -109,4 +110,6 @@ private[spark] object BlockManagerMessages {
extends ToBlockManagerMaster

case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

case class GetLocalDirsPath(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
}
84 changes: 60 additions & 24 deletions core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.util.Utils

import scala.collection.mutable

/**
* Creates and maintains the logical mapping between logical blocks and physical on-disk
* locations. By default, one block is mapped to one file with a name given by its BlockId.
Expand Down Expand Up @@ -51,40 +53,74 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
// of subDirs(i) is protected by the lock of subDirs(i)
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))

private val localDirsByBlkMgr = new mutable.HashMap[BlockManagerId, Array[String]]

def getLocalDirsPath(): Array[String] = {
localDirs.map(file => file.getAbsolutePath)
}

private val shutdownHook = addShutdownHook()

/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getFile().
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

// Create the subdirectory if it doesn't already exist
val subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create local dir in $newDir.")
def getFile(
fileName: String,
blockManagerId: BlockManagerId): File = {
val hash = Utils.nonNegativeHash(fileName)
val createDirIfAbsent =
blockManagerId.executorId == blockManager.blockManagerId.executorId

if (createDirIfAbsent) {
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

// Create the subdirectory if it doesn't already exist
var subDir = subDirs(dirId)(subDirId)
if (subDir == null) {
subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create local dir in $newDir.")
}
subDirs(dirId)(subDirId) = newDir
newDir
}
}
}
new File(subDir, fileName)
} else {
var tmpLocalDirs = localDirsByBlkMgr.get(blockManagerId)
if (!tmpLocalDirs.isDefined) {
tmpLocalDirs = localDirsByBlkMgr.synchronized {
val old = localDirsByBlkMgr.get(blockManagerId)
if(old.isDefined) {
old
} else {
localDirsByBlkMgr ++= blockManager.master.getLocalDirsPath(blockManager.blockManagerId)
localDirsByBlkMgr.get(blockManagerId)
}
}
subDirs(dirId)(subDirId) = newDir
newDir
}
}

new File(subDir, filename)
val dirId = hash % tmpLocalDirs.get.length
val subDirId = (hash / tmpLocalDirs.get.length) % subDirsPerLocalDir
new File(tmpLocalDirs.get(dirId) + "/" + "%02x".format(subDirId), fileName)
}
}

def getFile(blockId: BlockId): File = getFile(blockId.name)
def getFile(
blockId: BlockId,
blockManagerId: BlockManagerId = blockManager.blockManagerId): File = {
// val getFromThisExecutor = blockManagerId == blockManager.blockManagerId
// val dirs = if (getFromThisExecutor) getLocalDirsPath else localDirsByBlkMgr(blockManagerId)
getFile(blockId.name, blockManagerId)
}

/** Check if disk block manager has a block. */
def containsBlock(blockId: BlockId): Boolean = {
getFile(blockId.name).exists()
getFile(blockId).exists()
}

/** List all the files currently stored on disk by the disk manager. */
Expand Down
Loading