From 9d8e4d8df8095b262ee27e92b293dc26c99f61ab Mon Sep 17 00:00:00 2001 From: Raymond Liu Date: Tue, 9 Sep 2014 16:05:28 +0800 Subject: [PATCH] Initial commit for hierarchy disk store --- .../apache/spark/api/python/PythonRDD.scala | 3 +- .../shuffle/FileShuffleBlockManager.scala | 8 +- .../shuffle/IndexShuffleBlockManager.scala | 4 +- .../apache/spark/storage/BlockManager.scala | 39 +-- .../storage/DefaultDiskBlockManager.scala | 158 +++++++++++ .../spark/storage/DefaultDiskStore.scala | 166 ++++++++++++ .../spark/storage/DiskBlockManager.scala | 130 +-------- .../org/apache/spark/storage/DiskStore.scala | 128 +-------- .../spark/storage/DiskStoreManager.scala | 254 ++++++++++++++++++ .../apache/spark/storage/MemoryStore.scala | 3 +- .../org/apache/spark/storage/StoreInfo.scala | 76 ++++++ .../scala/org/apache/spark/util/Utils.scala | 24 +- .../collection/ExternalAppendOnlyMap.scala | 2 +- .../util/collection/ExternalSorter.scala | 5 +- .../spark/storage/BlockManagerSuite.scala | 21 +- .../spark/storage/DiskBlockManagerSuite.scala | 2 +- .../util/collection/ExternalSorterSuite.scala | 16 +- docs/configuration.md | 39 +++ 18 files changed, 774 insertions(+), 304 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/storage/DefaultDiskBlockManager.scala create mode 100644 core/src/main/scala/org/apache/spark/storage/DefaultDiskStore.scala create mode 100644 core/src/main/scala/org/apache/spark/storage/DiskStoreManager.scala create mode 100644 core/src/main/scala/org/apache/spark/storage/StoreInfo.scala diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index ae8010300a50..c590067d0984 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -60,8 +60,7 @@ private[spark] class PythonRDD( override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { val startTime = System.currentTimeMillis val env = SparkEnv.get - val localdir = env.blockManager.diskBlockManager.localDirs.map( - f => f.getPath()).mkString(",") + val localdir = Utils.getRootDirsConf(conf) envVars += ("SPARK_LOCAL_DIRS" -> localdir) // it's also used in monitor thread val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap) diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala index 439981d23234..fb6ba4ddd89a 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala @@ -118,7 +118,7 @@ class FileShuffleBlockManager(conf: SparkConf) } else { Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) - val blockFile = blockManager.diskBlockManager.getFile(blockId) + val blockFile = blockManager.shuffleStore.getDiskBlockManager.getFile(blockId) // Because of previous failures, the shuffle file may already exist on this machine. // If so, remove it. if (blockFile.exists) { @@ -154,7 +154,7 @@ class FileShuffleBlockManager(conf: SparkConf) val fileId = shuffleState.nextFileId.getAndIncrement() val files = Array.tabulate[File](numBuckets) { bucketId => val filename = physicalFileName(shuffleId, bucketId, fileId) - blockManager.diskBlockManager.getFile(filename) + blockManager.shuffleStore.getDiskBlockManager.getFile(filename) } val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files) shuffleState.allFileGroups.add(fileGroup) @@ -186,7 +186,7 @@ class FileShuffleBlockManager(conf: SparkConf) } throw new IllegalStateException("Failed to find shuffle block: " + blockId) } else { - val file = blockManager.diskBlockManager.getFile(blockId) + val file = blockManager.shuffleStore.getDiskBlockManager.getFile(blockId) new FileSegmentManagedBuffer(file, 0, file.length) } } @@ -211,7 +211,7 @@ class FileShuffleBlockManager(conf: SparkConf) } else { for (mapId <- state.completedMapTasks; reduceId <- 0 until state.numBuckets) { val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId) - blockManager.diskBlockManager.getFile(blockId).delete() + blockManager.shuffleStore.getDiskBlockManager.getFile(blockId).delete() } } logInfo("Deleted all files for shuffle " + shuffleId) diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala index 4ab34336d3f0..279d8d4e4d2d 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala @@ -46,11 +46,11 @@ class IndexShuffleBlockManager extends ShuffleBlockManager { } def getDataFile(shuffleId: Int, mapId: Int): File = { - blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0)) + blockManager.shuffleStore.getDiskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0)) } private def getIndexFile(shuffleId: Int, mapId: Int): File = { - blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0)) + blockManager.shuffleStore.getDiskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0)) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index d1bee3d2c033..33dfe64173a1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -67,14 +67,13 @@ private[spark] class BlockManager( blockTransferService.init(this) - val diskBlockManager = new DiskBlockManager(this, conf) - private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo] - // Actual storage of where blocks are kept - private var tachyonInitialized = false private[spark] val memoryStore = new MemoryStore(this, maxMemory) - private[spark] val diskStore = new DiskStore(this, diskBlockManager) + private[spark] val diskStoreManager = new DiskStoreManager(this) + private[spark] val shuffleStore = diskStoreManager.shuffleStore + + private var tachyonInitialized = false private[spark] lazy val tachyonStore: TachyonStore = { val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon") val appFolderName = conf.get("spark.tachyonStore.folderName") @@ -239,7 +238,11 @@ private[spark] class BlockManager( def getStatus(blockId: BlockId): Option[BlockStatus] = { blockInfo.get(blockId).map { info => val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L - val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L + val diskSize = if (diskStoreManager.contains(blockId)) { + diskStoreManager.getSize(blockId) + } else { + 0L + } // Assume that block is not in Tachyon BlockStatus(info.level, memSize, diskSize, 0L) } @@ -251,7 +254,7 @@ private[spark] class BlockManager( * may not know of). */ def getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] = { - (blockInfo.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq + (blockInfo.keys ++ diskStoreManager.getAllBlocks()).filter(filter).toSeq } /** @@ -312,13 +315,13 @@ private[spark] class BlockManager( case level => val inMem = level.useMemory && memoryStore.contains(blockId) val inTachyon = level.useOffHeap && tachyonStore.contains(blockId) - val onDisk = level.useDisk && diskStore.contains(blockId) + val onDisk = level.useDisk && diskStoreManager.contains(blockId) val deserialized = if (inMem) level.deserialized else false val replication = if (inMem || inTachyon || onDisk) level.replication else 1 val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, replication) val memSize = if (inMem) memoryStore.getSize(blockId) else 0L val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L - val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L + val diskSize = if (onDisk) diskStoreManager.getSize(blockId) else 0L BlockStatus(storageLevel, memSize, diskSize, tachyonSize) } } @@ -435,7 +438,7 @@ private[spark] class BlockManager( // Look for block on disk, potentially storing it back in memory if required if (level.useDisk) { logDebug(s"Getting block $blockId from disk") - val bytes: ByteBuffer = diskStore.getBytes(blockId) match { + val bytes: ByteBuffer = diskStoreManager.getBytes(blockId) match { case Some(b) => b case None => throw new BlockException( @@ -694,7 +697,7 @@ private[spark] class BlockManager( (false, tachyonStore) } else if (putLevel.useDisk) { // Don't get back the bytes from put unless we replicate them - (putLevel.replication > 1, diskStore) + (putLevel.replication > 1, diskStoreManager) } else { assert(putLevel == StorageLevel.NONE) throw new BlockException( @@ -861,13 +864,13 @@ private[spark] class BlockManager( val level = info.level // Drop to disk, if storage level requires - if (level.useDisk && !diskStore.contains(blockId)) { + if (level.useDisk && !diskStoreManager.contains(blockId)) { logInfo(s"Writing block $blockId to disk") data match { case Left(elements) => - diskStore.putArray(blockId, elements, level, returnValues = false) + diskStoreManager.putArray(blockId, elements, level, returnValues = false) case Right(bytes) => - diskStore.putBytes(blockId, bytes, level) + diskStoreManager.putBytes(blockId, bytes, level) } blockIsUpdated = true } @@ -932,7 +935,7 @@ private[spark] class BlockManager( info.synchronized { // Removals are idempotent in disk store and memory store. At worst, we get a warning. val removedFromMemory = memoryStore.remove(blockId) - val removedFromDisk = diskStore.remove(blockId) + val removedFromDisk = diskStoreManager.remove(blockId) val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) { logWarning(s"Block $blockId could not be removed as it was not found in either " + @@ -969,7 +972,7 @@ private[spark] class BlockManager( info.synchronized { val level = info.level if (level.useMemory) { memoryStore.remove(id) } - if (level.useDisk) { diskStore.remove(id) } + if (level.useDisk) { diskStoreManager.remove(id) } if (level.useOffHeap) { tachyonStore.remove(id) } iterator.remove() logInfo(s"Dropped block $id") @@ -1040,11 +1043,11 @@ private[spark] class BlockManager( def stop(): Unit = { blockTransferService.stop() - diskBlockManager.stop() + diskStoreManager.stop() actorSystem.stop(slaveActor) blockInfo.clear() memoryStore.clear() - diskStore.clear() + diskStoreManager.clear() if (tachyonInitialized) { tachyonStore.clear() } diff --git a/core/src/main/scala/org/apache/spark/storage/DefaultDiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DefaultDiskBlockManager.scala new file mode 100644 index 000000000000..ad13f8f5791c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/DefaultDiskBlockManager.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.io.File +import java.text.SimpleDateFormat +import java.util.{Date, Random, UUID} + +import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.executor.ExecutorExitCode +import org.apache.spark.util.Utils + +/** + * Creates and maintains the logical mapping between logical blocks and physical on-disk + * locations. By default, one block is mapped to one file with a name given by its BlockId. + * However, it is also possible to have a block map to only a segment of a file, by calling + * mapBlockToFileSegment(). + * + * Block files are hashed among the directories listed in spark.local.dir (or in + * SPARK_LOCAL_DIRS, if it's set). + */ +private[spark] class DefaultDiskBlockManager(conf: SparkConf, rootDirs: String) + extends DiskBlockManager { + + private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 + private val subDirsPerLocalDir = conf.getInt("spark.diskStore.subDirectories", 64) + + /* Create one local directory for each path mentioned in spark.local.dir; then, inside this + * directory, create multiple subdirectories that we will hash files into, in order to avoid + * having really large inodes at the top level. */ + val localDirs: Array[File] = createLocalDirs(rootDirs) + if (localDirs.isEmpty) { + logError("Failed to create any local dir.") + System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) + } + private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) + + addShutdownHook() + + override def getFile(filename: String): File = { + // Figure out which local directory it hashes to, and which subdirectory in that + val hash = Utils.nonNegativeHash(filename) + val dirId = hash % localDirs.length + val subDirId = (hash / localDirs.length) % subDirsPerLocalDir + + // Create the subdirectory if it doesn't already exist + var subDir = subDirs(dirId)(subDirId) + if (subDir == null) { + subDir = subDirs(dirId).synchronized { + val old = subDirs(dirId)(subDirId) + if (old != null) { + old + } else { + val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) + newDir.mkdir() + subDirs(dirId)(subDirId) = newDir + newDir + } + } + } + + new File(subDir, filename) + } + + /** Check if disk block manager has a block. */ + override def containsBlock(blockId: BlockId): Boolean = { + getFile(blockId.name).exists() + } + + /** List all the files currently stored on disk by the disk manager. */ + override def getAllFiles(): Seq[File] = { + // Get all the files inside the array of array of directories + subDirs.flatten.filter(_ != null).flatMap { dir => + val files = dir.listFiles() + if (files != null) files else Seq.empty + } + } + + /** Produces a unique block id and File suitable for intermediate results. */ + override def createTempBlock(): (TempBlockId, File) = { + var blockId = new TempBlockId(UUID.randomUUID()) + while (getFile(blockId).exists()) { + blockId = new TempBlockId(UUID.randomUUID()) + } + (blockId, getFile(blockId)) + } + + private def createLocalDirs(rootDirs: String): Array[File] = { + val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") + Utils.createDirs(rootDirs).flatMap { rootDir => + var foundLocalDir = false + var localDir: File = null + var localDirId: String = null + var tries = 0 + val rand = new Random() + while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) { + tries += 1 + try { + localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) + localDir = new File(rootDir, s"spark-local-$localDirId") + if (!localDir.exists) { + foundLocalDir = localDir.mkdirs() + } + } catch { + case e: Exception => + logWarning(s"Attempt $tries to create local dir $localDir failed", e) + } + } + if (!foundLocalDir) { + logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir." + + " Ignoring this directory.") + None + } else { + logInfo(s"Created local directory at $localDir") + Some(localDir) + } + } + } + + private def addShutdownHook() { + localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir)) + Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { + override def run(): Unit = Utils.logUncaughtExceptions { + logDebug("Shutdown hook called") + DefaultDiskBlockManager.this.stop() + } + }) + } + + /** Cleanup local dirs and stop shuffle sender. */ + private[spark] def stop() { + localDirs.foreach { localDir => + if (localDir.isDirectory() && localDir.exists()) { + try { + if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir) + } catch { + case e: Exception => + logError(s"Exception while deleting local spark dir: $localDir", e) + } + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/DefaultDiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DefaultDiskStore.scala new file mode 100644 index 000000000000..d43a34782de1 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/DefaultDiskStore.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.io.{File, FileOutputStream, RandomAccessFile} +import java.nio.ByteBuffer +import java.nio.channels.FileChannel.MapMode + +import org.apache.spark.Logging +import org.apache.spark.serializer.Serializer +import org.apache.spark.util.Utils + +/** + * Stores BlockManager blocks as files on disk. + */ +private[spark] class DefaultDiskStore(id: String, blockManager: BlockManager) + extends DiskStore(id, blockManager) with Logging { + + private val conf = blockManager.conf + private val storeConfPrefix = "spark.defaultDiskStore." + id + + /** + * If this diskStore's dir is manually defined, it will be used across all deploy modes. + * So make sure that the path are available on all nodes. Otherwise, the default rootDirs + * is used. which might differ in different mode, e.g. for Yarn mode, decides by yarn conf. + */ + private val DataDirs = conf.getOption(storeConfPrefix + ".dir").getOrElse( + Utils.getRootDirsConf(conf) + ) + private val diskBlockManager = new DefaultDiskBlockManager(conf, DataDirs) + + private val minMemoryMapBytes = conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L) + + override def getSize(blockId: BlockId): Long = { + diskBlockManager.getFile(blockId.name).length + } + + override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = { + // So that we do not modify the input offsets ! + // duplicate does not copy buffer, so inexpensive + val bytes = _bytes.duplicate() + logDebug(s"Attempting to put block $blockId") + val startTime = System.currentTimeMillis + val file = diskBlockManager.getFile(blockId) + val channel = new FileOutputStream(file).getChannel + while (bytes.remaining > 0) { + channel.write(bytes) + } + channel.close() + val finishTime = System.currentTimeMillis + logDebug("Block %s stored as %s file on disk in %d ms".format( + file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime)) + PutResult(bytes.limit(), Right(bytes.duplicate())) + } + + override def putArray( + blockId: BlockId, + values: Array[Any], + level: StorageLevel, + returnValues: Boolean): PutResult = { + putIterator(blockId, values.toIterator, level, returnValues) + } + + override def putIterator( + blockId: BlockId, + values: Iterator[Any], + level: StorageLevel, + returnValues: Boolean): PutResult = { + + logDebug(s"Attempting to write values for block $blockId") + val startTime = System.currentTimeMillis + val file = diskBlockManager.getFile(blockId) + val outputStream = new FileOutputStream(file) + blockManager.dataSerializeStream(blockId, outputStream, values) + val length = file.length + + val timeTaken = System.currentTimeMillis - startTime + logDebug("Block %s stored as %s file on disk in %d ms".format( + file.getName, Utils.bytesToString(length), timeTaken)) + + if (returnValues) { + // Return a byte buffer for the contents of the file + val buffer = getBytes(blockId).get + PutResult(length, Right(buffer)) + } else { + PutResult(length, null) + } + } + + private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = { + val channel = new RandomAccessFile(file, "r").getChannel + + try { + // For small files, directly read rather than memory map + if (length < minMemoryMapBytes) { + val buf = ByteBuffer.allocate(length.toInt) + channel.read(buf, offset) + buf.flip() + Some(buf) + } else { + Some(channel.map(MapMode.READ_ONLY, offset, length)) + } + } finally { + channel.close() + } + } + + override def getBytes(blockId: BlockId): Option[ByteBuffer] = { + val file = diskBlockManager.getFile(blockId.name) + getBytes(file, 0, file.length) + } + + def getBytes(segment: FileSegment): Option[ByteBuffer] = { + getBytes(segment.file, segment.offset, segment.length) + } + + override def getValues(blockId: BlockId): Option[Iterator[Any]] = { + getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) + } + + /** + * A version of getValues that allows a custom serializer. This is used as part of the + * shuffle short-circuit code. + */ + def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { + // TODO: Should bypass getBytes and use a stream based implementation, so that + // we won't use a lot of memory during e.g. external sort merge. + getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer)) + } + + override def remove(blockId: BlockId): Boolean = { + val file = diskBlockManager.getFile(blockId.name) + // If consolidation mode is used With HashShuffleMananger, the physical filename for the block + // is different from blockId.name. So the file returns here will not be exist, thus we avoid to + // delete the whole consolidated file by mistake. + if (file.exists()) { + file.delete() + } else { + false + } + } + + override def contains(blockId: BlockId): Boolean = { + val file = diskBlockManager.getFile(blockId.name) + file.exists() + } + + override def getDiskBlockManager(): DiskBlockManager = { + diskBlockManager + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index a715594f198c..dd35e1420353 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -18,80 +18,21 @@ package org.apache.spark.storage import java.io.File -import java.text.SimpleDateFormat -import java.util.{Date, Random, UUID} import org.apache.spark.{SparkConf, Logging} -import org.apache.spark.executor.ExecutorExitCode -import org.apache.spark.util.Utils -/** - * Creates and maintains the logical mapping between logical blocks and physical on-disk - * locations. By default, one block is mapped to one file with a name given by its BlockId. - * However, it is also possible to have a block map to only a segment of a file, by calling - * mapBlockToFileSegment(). - * - * Block files are hashed among the directories listed in spark.local.dir (or in - * SPARK_LOCAL_DIRS, if it's set). - */ -private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkConf) +private[spark] abstract class DiskBlockManager() extends Logging { - private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 - private val subDirsPerLocalDir = blockManager.conf.getInt("spark.diskStore.subDirectories", 64) - - /* Create one local directory for each path mentioned in spark.local.dir; then, inside this - * directory, create multiple subdirectories that we will hash files into, in order to avoid - * having really large inodes at the top level. */ - val localDirs: Array[File] = createLocalDirs(conf) - if (localDirs.isEmpty) { - logError("Failed to create any local dir.") - System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) - } - private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) - - addShutdownHook() - - def getFile(filename: String): File = { - // Figure out which local directory it hashes to, and which subdirectory in that - val hash = Utils.nonNegativeHash(filename) - val dirId = hash % localDirs.length - val subDirId = (hash / localDirs.length) % subDirsPerLocalDir - - // Create the subdirectory if it doesn't already exist - var subDir = subDirs(dirId)(subDirId) - if (subDir == null) { - subDir = subDirs(dirId).synchronized { - val old = subDirs(dirId)(subDirId) - if (old != null) { - old - } else { - val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) - newDir.mkdir() - subDirs(dirId)(subDirId) = newDir - newDir - } - } - } - - new File(subDir, filename) - } + def getFile(filename: String): File def getFile(blockId: BlockId): File = getFile(blockId.name) /** Check if disk block manager has a block. */ - def containsBlock(blockId: BlockId): Boolean = { - getFile(blockId.name).exists() - } + def containsBlock(blockId: BlockId): Boolean /** List all the files currently stored on disk by the disk manager. */ - def getAllFiles(): Seq[File] = { - // Get all the files inside the array of array of directories - subDirs.flatten.filter(_ != null).flatMap { dir => - val files = dir.listFiles() - if (files != null) files else Seq.empty - } - } + def getAllFiles(): Seq[File] /** List all the blocks currently stored on disk by the disk manager. */ def getAllBlocks(): Seq[BlockId] = { @@ -99,67 +40,8 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon } /** Produces a unique block id and File suitable for intermediate results. */ - def createTempBlock(): (TempBlockId, File) = { - var blockId = new TempBlockId(UUID.randomUUID()) - while (getFile(blockId).exists()) { - blockId = new TempBlockId(UUID.randomUUID()) - } - (blockId, getFile(blockId)) - } - - private def createLocalDirs(conf: SparkConf): Array[File] = { - val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") - Utils.getOrCreateLocalRootDirs(conf).flatMap { rootDir => - var foundLocalDir = false - var localDir: File = null - var localDirId: String = null - var tries = 0 - val rand = new Random() - while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) { - tries += 1 - try { - localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) - localDir = new File(rootDir, s"spark-local-$localDirId") - if (!localDir.exists) { - foundLocalDir = localDir.mkdirs() - } - } catch { - case e: Exception => - logWarning(s"Attempt $tries to create local dir $localDir failed", e) - } - } - if (!foundLocalDir) { - logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir." + - " Ignoring this directory.") - None - } else { - logInfo(s"Created local directory at $localDir") - Some(localDir) - } - } - } - - private def addShutdownHook() { - localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir)) - Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { - override def run(): Unit = Utils.logUncaughtExceptions { - logDebug("Shutdown hook called") - DiskBlockManager.this.stop() - } - }) - } + def createTempBlock(): (TempBlockId, File) /** Cleanup local dirs and stop shuffle sender. */ - private[spark] def stop() { - localDirs.foreach { localDir => - if (localDir.isDirectory() && localDir.exists()) { - try { - if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir) - } catch { - case e: Exception => - logError(s"Exception while deleting local spark dir: $localDir", e) - } - } - } - } + private[spark] def stop() } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index e9304f6bb45d..fdf14e97cb4c 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -17,133 +17,13 @@ package org.apache.spark.storage -import java.io.{File, FileOutputStream, RandomAccessFile} -import java.nio.ByteBuffer -import java.nio.channels.FileChannel.MapMode - import org.apache.spark.Logging -import org.apache.spark.serializer.Serializer -import org.apache.spark.util.Utils +import java.io.File +import java.nio.ByteBuffer -/** - * Stores BlockManager blocks on disk. - */ -private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager) +abstract class DiskStore(val id: String, blockManager: BlockManager) extends BlockStore(blockManager) with Logging { - val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L) - - override def getSize(blockId: BlockId): Long = { - diskManager.getFile(blockId.name).length - } - - override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = { - // So that we do not modify the input offsets ! - // duplicate does not copy buffer, so inexpensive - val bytes = _bytes.duplicate() - logDebug(s"Attempting to put block $blockId") - val startTime = System.currentTimeMillis - val file = diskManager.getFile(blockId) - val channel = new FileOutputStream(file).getChannel - while (bytes.remaining > 0) { - channel.write(bytes) - } - channel.close() - val finishTime = System.currentTimeMillis - logDebug("Block %s stored as %s file on disk in %d ms".format( - file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime)) - PutResult(bytes.limit(), Right(bytes.duplicate())) - } - - override def putArray( - blockId: BlockId, - values: Array[Any], - level: StorageLevel, - returnValues: Boolean): PutResult = { - putIterator(blockId, values.toIterator, level, returnValues) - } - - override def putIterator( - blockId: BlockId, - values: Iterator[Any], - level: StorageLevel, - returnValues: Boolean): PutResult = { - - logDebug(s"Attempting to write values for block $blockId") - val startTime = System.currentTimeMillis - val file = diskManager.getFile(blockId) - val outputStream = new FileOutputStream(file) - blockManager.dataSerializeStream(blockId, outputStream, values) - val length = file.length - - val timeTaken = System.currentTimeMillis - startTime - logDebug("Block %s stored as %s file on disk in %d ms".format( - file.getName, Utils.bytesToString(length), timeTaken)) - - if (returnValues) { - // Return a byte buffer for the contents of the file - val buffer = getBytes(blockId).get - PutResult(length, Right(buffer)) - } else { - PutResult(length, null) - } - } - - private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = { - val channel = new RandomAccessFile(file, "r").getChannel - - try { - // For small files, directly read rather than memory map - if (length < minMemoryMapBytes) { - val buf = ByteBuffer.allocate(length.toInt) - channel.read(buf, offset) - buf.flip() - Some(buf) - } else { - Some(channel.map(MapMode.READ_ONLY, offset, length)) - } - } finally { - channel.close() - } - } - - override def getBytes(blockId: BlockId): Option[ByteBuffer] = { - val file = diskManager.getFile(blockId.name) - getBytes(file, 0, file.length) - } - - def getBytes(segment: FileSegment): Option[ByteBuffer] = { - getBytes(segment.file, segment.offset, segment.length) - } - - override def getValues(blockId: BlockId): Option[Iterator[Any]] = { - getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) - } - - /** - * A version of getValues that allows a custom serializer. This is used as part of the - * shuffle short-circuit code. - */ - def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { - // TODO: Should bypass getBytes and use a stream based implementation, so that - // we won't use a lot of memory during e.g. external sort merge. - getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer)) - } - - override def remove(blockId: BlockId): Boolean = { - val file = diskManager.getFile(blockId.name) - // If consolidation mode is used With HashShuffleMananger, the physical filename for the block - // is different from blockId.name. So the file returns here will not be exist, thus we avoid to - // delete the whole consolidated file by mistake. - if (file.exists()) { - file.delete() - } else { - false - } - } + def getDiskBlockManager(): DiskBlockManager - override def contains(blockId: BlockId): Boolean = { - val file = diskManager.getFile(blockId.name) - file.exists() - } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStoreManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskStoreManager.scala new file mode 100644 index 000000000000..af77f54de2c2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/DiskStoreManager.scala @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.nio.ByteBuffer + +import org.apache.spark.Logging +import org.apache.spark.serializer.Serializer +import org.apache.spark.util.SizeEstimator +import java.io.{RandomAccessFile, File} +import java.nio.channels.FileChannel.MapMode + +/** + * Manage DiskStores. + */ +class DiskStoreManager(blockManager: BlockManager) + extends BlockStore(blockManager) with Logging { + + private val defaultID = "defaultDiskStore1" + private val defaultDiskStoreClass = "org.apache.spark.storage.DefaultDiskStore" + + def conf = blockManager.conf + + val diskStoreIDs = conf.getOption("spark.diskStore.ids").map(_.split(",")) + val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L) + + val allStoreInfos = diskStoreIDs match { + case Some(ids) => + ids.map { id => + val className = conf.get("spark.diskStore." + id + ".class") + val size = conf.getLong("spark.diskStore." + id + ".size", 0) + val clazz = Class.forName(className) + val cons = clazz.getConstructor(classOf[String], classOf[BlockManager]) + val store = cons.newInstance(id, blockManager).asInstanceOf[DiskStore] + new StoreInfo(id, store, size) + } + + case None => + val clazz = Class.forName(defaultDiskStoreClass) + val cons = clazz.getConstructor(classOf[String], classOf[BlockManager]) + val store = cons.newInstance(defaultID, blockManager).asInstanceOf[DiskStore] + Array(new StoreInfo(defaultID, store, 0)) + } + + // if there is only one store in the store chain, use singleStore for shortcut path + val singleStore = { + if (allStoreInfos.length > 1) null else allStoreInfos.last.store + } + + // the last store in the store chains. + val lastStore = allStoreInfos.last.store + + // Shuffle data related code read/write file directly a lot and is hard to predict + // data size. Without heavy modification (including shuffle map out, spill, fetcher etc.), + // it won't be possible to use a hierarchy store. So put all shuffle data on last store for now. + val shuffleStore = lastStore + + // store info in the chain except the last one + val priorityStoreInfos = allStoreInfos.take(allStoreInfos.length - 1) + + if (allStoreInfos.last.totalSize != 0) { + logWarning(("The last disk store : %s should not have a limited size quota."). + format(allStoreInfos.last.id)) + } + + override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = { + if (singleStore != null) { + return singleStore.putBytes(blockId, _bytes, level) + } + + for (info <- priorityStoreInfos) { + if (info.tryUse(_bytes.limit())) { + val result = info.store.putBytes(blockId, _bytes, level) + if (result.size <= 0) { + info.free(_bytes.limit()) + } else { + return result + } + } + } + + lastStore.putBytes(blockId, _bytes, level) + } + + override def putArray( + blockId: BlockId, + values: Array[Any], + level: StorageLevel, + returnValues: Boolean): PutResult = { + putIterator(blockId, values.toIterator, level, returnValues) + } + + override def putIterator( + blockId: BlockId, + values: Iterator[Any], + level: StorageLevel, + returnValues: Boolean) + : PutResult = { + + if (singleStore != null) { + return singleStore.putIterator(blockId, values, level, returnValues) + } + + // The size estimate approaching here is ugly and not accurate. + // But I really could not figure out an efficient way to make it accurate. + + // sizeEstimate might for most case be several times larger than the serialized data's size. + // This might lead to some disk quota waste problems when a single block is really large, + // and we decide that it couldn't be fit, while actually it can. + // However, better than over run the quota. + + val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef]) + + for (info <- priorityStoreInfos) { + if (info.tryUse(sizeEstimate)) { + val result = info.store.putIterator(blockId, values, level, returnValues) + if (result.size <= 0) { + info.free(sizeEstimate) + } else { + // now correct the size with actual value + val sizeDiff = sizeEstimate - result.size + if (sizeDiff > 0) { + info.free(sizeDiff) + } else { + // Should be rare to get here. + // Call use instead of tryUse here, since we already written to disk. + // Even this might actually go beyond the size quota. + info.use(math.abs(sizeDiff)) + } + return result + } + } + } + + lastStore.putIterator(blockId, values, level, returnValues) + } + + override def getSize(blockId: BlockId): Long = { + if (singleStore != null) { + return singleStore.getSize(blockId) + } + + for (info <- priorityStoreInfos) { + if (info.store.contains(blockId)) { + return info.store.getSize(blockId) + } + } + + lastStore.getSize(blockId) + } + + override def getBytes(blockId: BlockId): Option[ByteBuffer] = { + if (singleStore != null) { + return singleStore.getBytes(blockId) + } + + for (info <- priorityStoreInfos) { + if (info.store.contains(blockId)) { + return info.store.getBytes(blockId) + } + } + + lastStore.getBytes(blockId) + } + + private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = { + val channel = new RandomAccessFile(file, "r").getChannel + + try { + // For small files, directly read rather than memory map + if (length < minMemoryMapBytes) { + val buf = ByteBuffer.allocate(length.toInt) + channel.read(buf, offset) + buf.flip() + Some(buf) + } else { + Some(channel.map(MapMode.READ_ONLY, offset, length)) + } + } finally { + channel.close() + } + } + + def getBytes(segment: FileSegment): Option[ByteBuffer] = { + getBytes(segment.file, segment.offset, segment.length) + } + + override def getValues(blockId: BlockId): Option[Iterator[Any]] = { + getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) + } + + /** + * A version of getValues that allows a custom serializer. This is used as part of the + * shuffle short-circuit code. + */ + def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { + // TODO: Should bypass getBytes and use a stream based implementation, so that + // we won't use a lot of memory during e.g. external sort merge. + getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer)) + } + + override def remove(blockId: BlockId): Boolean = { + if (singleStore != null) { + return singleStore.remove(blockId) + } + + for (info <- priorityStoreInfos) { + if (info.store.contains(blockId)) { + return info.store.remove(blockId) + } + } + + lastStore.remove(blockId) + } + + override def contains(blockId: BlockId): Boolean = { + for (info <- allStoreInfos) { + if (info.store.contains(blockId)) { + return true + } + } + false + } + + override def clear() = { + for (info <- allStoreInfos) { + info.store.clear() + } + } + + def getAllBlocks(): Seq[BlockId] = { + allStoreInfos.toSeq.flatMap(_.store.getDiskBlockManager.getAllBlocks) + } + + private[spark] def stop() { + allStoreInfos.foreach(_.store.getDiskBlockManager.stop) + } + +} diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 0a09c24d6187..032ed29f2f70 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -136,7 +136,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) s"Free memory is $freeMemory bytes.") if (level.useDisk && allowPersistToDisk) { logWarning(s"Persisting block $blockId to disk instead.") - val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues) + val res = blockManager.diskStoreManager.putIterator( + blockId, iteratorValues, level, returnValues) PutResult(res.size, res.data, droppedBlocks) } else { PutResult(0, Left(iteratorValues), droppedBlocks) diff --git a/core/src/main/scala/org/apache/spark/storage/StoreInfo.scala b/core/src/main/scala/org/apache/spark/storage/StoreInfo.scala new file mode 100644 index 000000000000..7c624b1c6cf1 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/StoreInfo.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.apache.spark.util.Utils + +class StoreInfo( + val id: String, + val store: DiskStore, + val totalSize: Long = 0L) { + + private var currentSize = 0L + private val currentSizeLock = new AnyRef + + def freeSize = totalSize - currentSize + + override def toString = { + import Utils.bytesToString + ("Store %s : TotalSize: %s; CurrentSize: %s").format( + id, bytesToString(totalSize), bytesToString(currentSize)) + } + + def tryUse(size: Long): Boolean = { + // 0 for totalSize means unlimited volume + if (totalSize == 0) { + return true + } + + currentSizeLock.synchronized { + if (freeSize >= size) { + currentSize += size + true + } else { + false + } + } + } + + def use(size: Long): Unit = { + if (totalSize == 0) { + return + } + + currentSizeLock.synchronized { + currentSize += size + } + } + + def free(size: Long): Unit = { + if (totalSize == 0) { + return + } + + currentSizeLock.synchronized { + currentSize -= size + } + } + +} + + diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0ae28f911e30..e4c81b71f3a9 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -43,6 +43,10 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.ExecutorUncaughtExceptionHandler import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} +import scala.Array +import scala.Predef._ +import scala.Some +import org.apache.spark.util.CallSite /** CallSite represents a place in user code. It can have a short and a long form. */ private[spark] case class CallSite(shortForm: String, longForm: String) @@ -451,6 +455,12 @@ private[spark] object Utils extends Logging { * If no directories could be created, this will return an empty list. */ private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = { + val rootDirs = getRootDirsConf(conf) + logDebug(s"Getting/creating local root dirs at '$rootDirs'") + createDirs(rootDirs) + } + + private[spark] def getRootDirsConf(conf: SparkConf): String = { val confValue = if (isRunningInYarnContainer(conf)) { // If we are in yarn mode, systems can have different disk layouts so we must set it // to what Yarn on this system said was available. @@ -459,21 +469,23 @@ private[spark] object Utils extends Logging { Option(conf.getenv("SPARK_LOCAL_DIRS")).getOrElse( conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) } - val rootDirs = confValue.split(',') - logDebug(s"Getting/creating local root dirs at '$confValue'") + confValue + } - rootDirs.flatMap { rootDir => - val localDir: File = new File(rootDir) + private[spark] def createDirs(dirs: String): Array[String] = { + dirs.split(',').flatMap { dir => + val localDir: File = new File(dir) val foundLocalDir = localDir.exists || localDir.mkdirs() if (!foundLocalDir) { - logError(s"Failed to create local root dir in $rootDir. Ignoring this directory.") + logError(s"Failed to create local dir in $dir. Ignoring this directory.") None } else { - Some(rootDir) + Some(dir) } } } + /** Get the Yarn approved local directories. */ private def getYarnLocalDirs(conf: SparkConf): String = { // Hadoop 0.23 and 2.x have different Environment variable names for the diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 8a015c1d26a9..6d35ece73523 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -71,8 +71,8 @@ class ExternalAppendOnlyMap[K, V, C]( private var currentMap = new SizeTrackingAppendOnlyMap[K, C] private val spilledMaps = new ArrayBuffer[DiskMapIterator] private val sparkConf = SparkEnv.get.conf - private val diskBlockManager = blockManager.diskBlockManager private val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager + private val diskBlockManager = blockManager.shuffleStore.getDiskBlockManager // Number of pairs inserted since last spill; note that we count them even if a value is merged // with a previous key in case we're doing something like groupBy where the result grows diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 782b979e2e93..3fe0b69f2221 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -85,7 +85,7 @@ private[spark] class ExternalSorter[K, V, C]( private val shouldPartition = numPartitions > 1 private val blockManager = SparkEnv.get.blockManager - private val diskBlockManager = blockManager.diskBlockManager + private val diskBlockManager = blockManager.shuffleStore.getDiskBlockManager private val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager private val ser = Serializer.getSerializer(serializer) private val serInstance = ser.newInstance() @@ -789,7 +789,8 @@ private[spark] class ExternalSorter[K, V, C]( if (writer.isOpen) { writer.commitAndClose() } - blockManager.diskStore.getValues(writer.blockId, ser).get.asInstanceOf[Iterator[Product2[K, C]]] + blockManager.diskStoreManager.getValues(writer.blockId, ser).get + .asInstanceOf[Iterator[Product2[K, C]]] } def stop(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index e251660dae5d..318b1905df7a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -823,16 +823,15 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter // This sequence of mocks makes these tests fairly brittle. It would // be nice to refactor classes involved in disk storage in a way that // allows for easier testing. + val blockManager = mock(classOf[BlockManager]) when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString)) - val diskBlockManager = new DiskBlockManager(blockManager, conf) - - val diskStoreMapped = new DiskStore(blockManager, diskBlockManager) + val diskStoreMapped = new DefaultDiskStore("defaultStore1", blockManager) diskStoreMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY) val mapped = diskStoreMapped.getBytes(blockId).get when(blockManager.conf).thenReturn(conf.clone.set(confKey, (1000 * 1000).toString)) - val diskStoreNotMapped = new DiskStore(blockManager, diskBlockManager) + val diskStoreNotMapped = new DefaultDiskStore("defaultStore1", blockManager) diskStoreNotMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY) val notMapped = diskStoreNotMapped.getBytes(blockId).get @@ -896,7 +895,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter case _ => fail("Updated block is neither list2 nor list4") } } - assert(store.diskStore.contains("list2"), "list2 was not in disk store") + assert(store.diskStoreManager.contains("list2"), "list2 was not in disk store") assert(store.memoryStore.contains("list4"), "list4 was not in memory store") // No updated blocks - list5 is too big to fit in store and nothing is kicked out @@ -912,11 +911,11 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(!store.memoryStore.contains("list5"), "list5 was in memory store") // disk store contains only list2 - assert(!store.diskStore.contains("list1"), "list1 was in disk store") - assert(store.diskStore.contains("list2"), "list2 was not in disk store") - assert(!store.diskStore.contains("list3"), "list3 was in disk store") - assert(!store.diskStore.contains("list4"), "list4 was in disk store") - assert(!store.diskStore.contains("list5"), "list5 was in disk store") + assert(!store.diskStoreManager.contains("list1"), "list1 was in disk store") + assert(store.diskStoreManager.contains("list2"), "list2 was not in disk store") + assert(!store.diskStoreManager.contains("list3"), "list3 was in disk store") + assert(!store.diskStoreManager.contains("list4"), "list4 was in disk store") + assert(!store.diskStoreManager.contains("list5"), "list5 was in disk store") } test("query block statuses") { @@ -1151,7 +1150,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter store = makeBlockManager(12000) val memAndDisk = StorageLevel.MEMORY_AND_DISK val memoryStore = store.memoryStore - val diskStore = store.diskStore + val diskStore = store.diskStoreManager val smallList = List.fill(40)(new Array[Byte](100)) val bigList = List.fill(40)(new Array[Byte](1000)) def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]] diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index e4522e00a622..28d0179e8f1c 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -64,7 +64,7 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before override def beforeEach() { val conf = testConf.clone conf.set("spark.local.dir", rootDirs) - diskBlockManager = new DiskBlockManager(blockManager, conf) + diskBlockManager = new DefaultDiskBlockManager(conf, rootDirs) } override def afterEach() { diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index 706faed980f3..29430520eb4e 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -135,7 +135,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe None, Some(new HashPartitioner(7)), Some(ord), None) assertDidNotBypassMergeSort(sorter) sorter.insertAll(elements) - assert(sc.env.blockManager.diskBlockManager.getAllFiles().length > 0) // Make sure it spilled + assert(sc.env.blockManager.shuffleStore.getDiskBlockManager.getAllFiles().length > 0) // Make sure it spilled val iter = sorter.partitionedIterator.map(p => (p._1, p._2.toList)) assert(iter.next() === (0, Nil)) assert(iter.next() === (1, List((1, 1)))) @@ -159,7 +159,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe None, Some(new HashPartitioner(7)), None, None) assertBypassedMergeSort(sorter) sorter.insertAll(elements) - assert(sc.env.blockManager.diskBlockManager.getAllFiles().length > 0) // Make sure it spilled + assert(sc.env.blockManager.shuffleStore.getDiskBlockManager.getAllFiles().length > 0) // Make sure it spilled val iter = sorter.partitionedIterator.map(p => (p._1, p._2.toList)) assert(iter.next() === (0, Nil)) assert(iter.next() === (1, List((1, 1)))) @@ -318,7 +318,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) - val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager + val diskBlockManager = SparkEnv.get.blockManager.shuffleStore.getDiskBlockManager val ord = implicitly[Ordering[Int]] @@ -345,7 +345,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) - val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager + val diskBlockManager = SparkEnv.get.blockManager.shuffleStore.getDiskBlockManager val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None) assertBypassedMergeSort(sorter) @@ -368,7 +368,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) - val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager + val diskBlockManager = SparkEnv.get.blockManager.shuffleStore.getDiskBlockManager val ord = implicitly[Ordering[Int]] @@ -393,7 +393,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) - val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager + val diskBlockManager = SparkEnv.get.blockManager.shuffleStore.getDiskBlockManager val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None) assertBypassedMergeSort(sorter) @@ -415,7 +415,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) - val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager + val diskBlockManager = SparkEnv.get.blockManager.shuffleStore.getDiskBlockManager val data = sc.parallelize(0 until 100000, 2).map(i => (i, i)) assert(data.reduceByKey(_ + _).count() === 100000) @@ -430,7 +430,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) - val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager + val diskBlockManager = SparkEnv.get.blockManager.shuffleStore.getDiskBlockManager val data = sc.parallelize(0 until 100000, 2).map(i => { if (i == 99990) { diff --git a/docs/configuration.md b/docs/configuration.md index 36178efb9710..15002ee754bb 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -534,6 +534,45 @@ Apart from these, the following properties are also available, and may be useful storage space to unroll the new block in its entirety. + + spark.diskStore.ids + defaultDiskStore1 + + The ids for hierarchy diskStores, it can be a comma-separated list of multiple diskStores. + The order of the id decide the priority of the corresponding store. spark will try to write data + to store in the list by order,if the data written to one store reach its quota limitations, then + further data will be written to next store in list. The last one should always be considered as no + quota limitations. + + + + spark.diskStore.specificID.class + none + + The specific class implemented the diskStore represented by specificID. + specificID here refer to one of the id list in spark.diskStore.ids. + + + + spark.diskStore.specificID.size + 0 + + The disk size quota of the diskStore represented by specificID. + 0 by default means no limit. + specificID here refer to one of the id list in spark.diskStore.ids. + + + + spark.defaultDiskStore.specificID.dir + spark.local.dir + + The Directories of defaultDiskStore represented by specificID. Can be a comma-separated list of + multiple directories. + Notice that this is used by org.apache.spark.storage.DefaultDiskStore, the other possible disk + store implementation does not necessary use this. + specificID here refer to one of the id list in spark.diskStore.ids. + + spark.tachyonStore.baseDir System.getProperty("java.io.tmpdir")