Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ private[spark] class PythonRDD(
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
val startTime = System.currentTimeMillis
val env = SparkEnv.get
val localdir = env.blockManager.diskBlockManager.localDirs.map(
f => f.getPath()).mkString(",")
val localdir = Utils.getRootDirsConf(conf)
envVars += ("SPARK_LOCAL_DIRS" -> localdir) // it's also used in monitor thread
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class FileShuffleBlockManager(conf: SparkConf)
} else {
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val blockFile = blockManager.diskBlockManager.getFile(blockId)
val blockFile = blockManager.shuffleStore.getDiskBlockManager.getFile(blockId)
// Because of previous failures, the shuffle file may already exist on this machine.
// If so, remove it.
if (blockFile.exists) {
Expand Down Expand Up @@ -154,7 +154,7 @@ class FileShuffleBlockManager(conf: SparkConf)
val fileId = shuffleState.nextFileId.getAndIncrement()
val files = Array.tabulate[File](numBuckets) { bucketId =>
val filename = physicalFileName(shuffleId, bucketId, fileId)
blockManager.diskBlockManager.getFile(filename)
blockManager.shuffleStore.getDiskBlockManager.getFile(filename)
}
val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files)
shuffleState.allFileGroups.add(fileGroup)
Expand Down Expand Up @@ -186,7 +186,7 @@ class FileShuffleBlockManager(conf: SparkConf)
}
throw new IllegalStateException("Failed to find shuffle block: " + blockId)
} else {
val file = blockManager.diskBlockManager.getFile(blockId)
val file = blockManager.shuffleStore.getDiskBlockManager.getFile(blockId)
new FileSegmentManagedBuffer(file, 0, file.length)
}
}
Expand All @@ -211,7 +211,7 @@ class FileShuffleBlockManager(conf: SparkConf)
} else {
for (mapId <- state.completedMapTasks; reduceId <- 0 until state.numBuckets) {
val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
blockManager.diskBlockManager.getFile(blockId).delete()
blockManager.shuffleStore.getDiskBlockManager.getFile(blockId).delete()
}
}
logInfo("Deleted all files for shuffle " + shuffleId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ class IndexShuffleBlockManager extends ShuffleBlockManager {
}

def getDataFile(shuffleId: Int, mapId: Int): File = {
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0))
blockManager.shuffleStore.getDiskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0))
}

private def getIndexFile(shuffleId: Int, mapId: Int): File = {
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0))
blockManager.shuffleStore.getDiskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0))
}

/**
Expand Down
39 changes: 21 additions & 18 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,13 @@ private[spark] class BlockManager(

blockTransferService.init(this)

val diskBlockManager = new DiskBlockManager(this, conf)

private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]

// Actual storage of where blocks are kept
private var tachyonInitialized = false
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
private[spark] val diskStoreManager = new DiskStoreManager(this)
private[spark] val shuffleStore = diskStoreManager.shuffleStore

private var tachyonInitialized = false
private[spark] lazy val tachyonStore: TachyonStore = {
val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
val appFolderName = conf.get("spark.tachyonStore.folderName")
Expand Down Expand Up @@ -239,7 +238,11 @@ private[spark] class BlockManager(
def getStatus(blockId: BlockId): Option[BlockStatus] = {
blockInfo.get(blockId).map { info =>
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
val diskSize = if (diskStoreManager.contains(blockId)) {
diskStoreManager.getSize(blockId)
} else {
0L
}
// Assume that block is not in Tachyon
BlockStatus(info.level, memSize, diskSize, 0L)
}
Expand All @@ -251,7 +254,7 @@ private[spark] class BlockManager(
* may not know of).
*/
def getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] = {
(blockInfo.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq
(blockInfo.keys ++ diskStoreManager.getAllBlocks()).filter(filter).toSeq
}

/**
Expand Down Expand Up @@ -312,13 +315,13 @@ private[spark] class BlockManager(
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val inTachyon = level.useOffHeap && tachyonStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
val onDisk = level.useDisk && diskStoreManager.contains(blockId)
val deserialized = if (inMem) level.deserialized else false
val replication = if (inMem || inTachyon || onDisk) level.replication else 1
val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, replication)
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStoreManager.getSize(blockId) else 0L
BlockStatus(storageLevel, memSize, diskSize, tachyonSize)
}
}
Expand Down Expand Up @@ -435,7 +438,7 @@ private[spark] class BlockManager(
// Look for block on disk, potentially storing it back in memory if required
if (level.useDisk) {
logDebug(s"Getting block $blockId from disk")
val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
val bytes: ByteBuffer = diskStoreManager.getBytes(blockId) match {
case Some(b) => b
case None =>
throw new BlockException(
Expand Down Expand Up @@ -694,7 +697,7 @@ private[spark] class BlockManager(
(false, tachyonStore)
} else if (putLevel.useDisk) {
// Don't get back the bytes from put unless we replicate them
(putLevel.replication > 1, diskStore)
(putLevel.replication > 1, diskStoreManager)
} else {
assert(putLevel == StorageLevel.NONE)
throw new BlockException(
Expand Down Expand Up @@ -861,13 +864,13 @@ private[spark] class BlockManager(
val level = info.level

// Drop to disk, if storage level requires
if (level.useDisk && !diskStore.contains(blockId)) {
if (level.useDisk && !diskStoreManager.contains(blockId)) {
logInfo(s"Writing block $blockId to disk")
data match {
case Left(elements) =>
diskStore.putArray(blockId, elements, level, returnValues = false)
diskStoreManager.putArray(blockId, elements, level, returnValues = false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
diskStoreManager.putBytes(blockId, bytes, level)
}
blockIsUpdated = true
}
Expand Down Expand Up @@ -932,7 +935,7 @@ private[spark] class BlockManager(
info.synchronized {
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
val removedFromDisk = diskStoreManager.remove(blockId)
val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
Expand Down Expand Up @@ -969,7 +972,7 @@ private[spark] class BlockManager(
info.synchronized {
val level = info.level
if (level.useMemory) { memoryStore.remove(id) }
if (level.useDisk) { diskStore.remove(id) }
if (level.useDisk) { diskStoreManager.remove(id) }
if (level.useOffHeap) { tachyonStore.remove(id) }
iterator.remove()
logInfo(s"Dropped block $id")
Expand Down Expand Up @@ -1040,11 +1043,11 @@ private[spark] class BlockManager(

def stop(): Unit = {
blockTransferService.stop()
diskBlockManager.stop()
diskStoreManager.stop()
actorSystem.stop(slaveActor)
blockInfo.clear()
memoryStore.clear()
diskStore.clear()
diskStoreManager.clear()
if (tachyonInitialized) {
tachyonStore.clear()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.storage

import java.io.File
import java.text.SimpleDateFormat
import java.util.{Date, Random, UUID}

import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.util.Utils

/**
* Creates and maintains the logical mapping between logical blocks and physical on-disk
* locations. By default, one block is mapped to one file with a name given by its BlockId.
* However, it is also possible to have a block map to only a segment of a file, by calling
* mapBlockToFileSegment().
*
* Block files are hashed among the directories listed in spark.local.dir (or in
* SPARK_LOCAL_DIRS, if it's set).
*/
private[spark] class DefaultDiskBlockManager(conf: SparkConf, rootDirs: String)
extends DiskBlockManager {

private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
private val subDirsPerLocalDir = conf.getInt("spark.diskStore.subDirectories", 64)

/* Create one local directory for each path mentioned in spark.local.dir; then, inside this
* directory, create multiple subdirectories that we will hash files into, in order to avoid
* having really large inodes at the top level. */
val localDirs: Array[File] = createLocalDirs(rootDirs)
if (localDirs.isEmpty) {
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))

addShutdownHook()

override def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

// Create the subdirectory if it doesn't already exist
var subDir = subDirs(dirId)(subDirId)
if (subDir == null) {
subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
newDir.mkdir()
subDirs(dirId)(subDirId) = newDir
newDir
}
}
}

new File(subDir, filename)
}

/** Check if disk block manager has a block. */
override def containsBlock(blockId: BlockId): Boolean = {
getFile(blockId.name).exists()
}

/** List all the files currently stored on disk by the disk manager. */
override def getAllFiles(): Seq[File] = {
// Get all the files inside the array of array of directories
subDirs.flatten.filter(_ != null).flatMap { dir =>
val files = dir.listFiles()
if (files != null) files else Seq.empty
}
}

/** Produces a unique block id and File suitable for intermediate results. */
override def createTempBlock(): (TempBlockId, File) = {
var blockId = new TempBlockId(UUID.randomUUID())
while (getFile(blockId).exists()) {
blockId = new TempBlockId(UUID.randomUUID())
}
(blockId, getFile(blockId))
}

private def createLocalDirs(rootDirs: String): Array[File] = {
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
Utils.createDirs(rootDirs).flatMap { rootDir =>
var foundLocalDir = false
var localDir: File = null
var localDirId: String = null
var tries = 0
val rand = new Random()
while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) {
tries += 1
try {
localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
localDir = new File(rootDir, s"spark-local-$localDirId")
if (!localDir.exists) {
foundLocalDir = localDir.mkdirs()
}
} catch {
case e: Exception =>
logWarning(s"Attempt $tries to create local dir $localDir failed", e)
}
}
if (!foundLocalDir) {
logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir." +
" Ignoring this directory.")
None
} else {
logInfo(s"Created local directory at $localDir")
Some(localDir)
}
}
}

private def addShutdownHook() {
localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir))
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
DefaultDiskBlockManager.this.stop()
}
})
}

/** Cleanup local dirs and stop shuffle sender. */
private[spark] def stop() {
localDirs.foreach { localDir =>
if (localDir.isDirectory() && localDir.exists()) {
try {
if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
} catch {
case e: Exception =>
logError(s"Exception while deleting local spark dir: $localDir", e)
}
}
}
}
}
Loading