Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,18 @@ package object config {
.intConf
.createWithDefault(64)

private[spark] val DISK_STORE_BLACKLIST_TIMEOUT =
ConfigBuilder("spark.diskStore.blacklist.timeoutMs")
.doc("The timeout in milliseconds to wait before moving blacklisted local directory " +
"out from the blacklist.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1d")

private[spark] val DISK_STORE_MAX_ATTEMPTS =
ConfigBuilder("spark.diskStore.maxAttempts")
.intConf
.createWithDefault(3)

private[spark] val BLOCK_FAILURES_BEFORE_LOCATION_REFRESH =
ConfigBuilder("spark.block.failures.beforeLocationRefresh")
.doc("Max number of failures before this block manager refreshes " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
package org.apache.spark.storage

import java.io.{File, IOException}
import java.nio.file.Files
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark.SparkConf
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.util.{ShutdownHookManager, Utils}
import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils}

/**
* Creates and maintains the logical mapping between logical blocks and physical on-disk
Expand All @@ -32,9 +37,12 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
* Block files are hashed among the directories listed in spark.local.dir (or in
* SPARK_LOCAL_DIRS, if it's set).
*/
private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging {
private[spark] class DiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolean, clock: Clock = new SystemClock()) extends Logging {

private[spark] val subDirsPerLocalDir = conf.get(config.DISKSTORE_SUB_DIRECTORIES)
private[spark] val maxAttempts = conf.get(config.DISK_STORE_MAX_ATTEMPTS)
private[spark] val blacklistTimeout = conf.get(config.DISK_STORE_BLACKLIST_TIMEOUT)

/* Create one local directory for each path mentioned in spark.local.dir; then, inside this
* directory, create multiple subdirectories that we will hash files into, in order to avoid
Expand All @@ -48,32 +56,77 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
// of subDirs(i) is protected by the lock of subDirs(i)
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))

private[spark] val blacklistedDirs = ArrayBuffer[File]()
private[spark] val dirToBlacklistExpiryTime = new HashMap[File, Long]
// Migrating to a new good local directory to store the file when a bad one found.
// it should be small enough to put into memory
private[spark] val migratedDirIdIndex = new ConcurrentHashMap[String, Int].asScala

private val shutdownHook = addShutdownHook()

/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
def getFile(filename: String): File = {
var mostRecentFailure: Exception = null
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
val dirId = migratedDirIdIndex.getOrElse(filename, hash % localDirs.length)
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

// Create the subdirectory if it doesn't already exist
val subDir = subDirs(dirId).synchronized {
// Update blacklist
val now = clock.getTimeMillis()
blacklistedDirs.synchronized {
val unblacklisted = blacklistedDirs.filter(now >= dirToBlacklistExpiryTime(_))
unblacklisted.foreach { dir =>
blacklistedDirs -= dir
dirToBlacklistExpiryTime.remove(dir)
}
}

val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create local dir in $newDir.")
assert(!migratedDirIdIndex.contains(filename))
var newDir: File = null
for (attempt <- 0 until maxAttempts if newDir == null) {
val goodDirId = blacklistedDirs.synchronized {
val isBlacklisted = blacklistedDirs.contains(localDirs(dirId))
if (isBlacklisted) {
localDirs.indexWhere(!blacklistedDirs.contains(_))
} else {
dirId
}
}
try {
if (goodDirId < 0) {
throw new IOException(s"Cannot store file $filename anywhere due to " +
s"${blacklistedDirs.length} of ${localDirs.length} local directories " +
s"is blacklisted.")
}
newDir = new File(localDirs(goodDirId), "%02x".format(subDirId))
Files.createDirectories(newDir.toPath)
subDirs(goodDirId)(subDirId) = newDir
if (goodDirId != dirId) {
migratedDirIdIndex.put(filename, goodDirId)
}
} catch {
case e: IOException =>
logError(s"Failed to look up file $filename in attempt $attempt", e)
blacklistedDirs.synchronized {
blacklistedDirs += localDirs(dirId)
dirToBlacklistExpiryTime.put(localDirs(dirId), now + blacklistTimeout)
}
mostRecentFailure = e
newDir = null
}
}
subDirs(dirId)(subDirId) = newDir
newDir
Option(newDir).getOrElse(throw mostRecentFailure)
}
}

new File(subDir, filename)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

package org.apache.spark.storage

import java.io.{File, FileWriter}
import java.util.UUID
import java.io.{File, FileWriter, IOException}
import java.nio.file.Files

import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.util.Utils
import org.apache.spark.internal.config
import org.apache.spark.util.{ManualClock, Utils}

class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
private val testConf = new SparkConf(false)
Expand Down Expand Up @@ -91,4 +92,86 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
for (i <- 0 until numBytes) writer.write(i)
writer.close()
}

def setPermissionRecursively(root: File,
writable: Boolean, readable: Boolean, tailRecursion: Boolean): Unit = {
if (root.isDirectory) {
if (tailRecursion) {
root.setWritable(writable)
root.setReadable(readable)
root.listFiles().foreach(setPermissionRecursively(_, writable, readable, tailRecursion))
} else {
root.listFiles().foreach(setPermissionRecursively(_, writable, readable, tailRecursion))
root.setWritable(writable)
root.setReadable(readable)
}
} else {
root.setWritable(writable)
root.setReadable(readable)
}
}

test(s"test blacklisting bad disk directory") {
for ((badDiskDir, goodDiskDir) <- Seq((rootDir0, rootDir1), (rootDir1, rootDir0))) {
val blockId1 = TestBlockId("1")
val blockId2 = TestBlockId("2")
val blockId3 = TestBlockId("3")

val conf = testConf.clone
conf.set("spark.local.dir", rootDirs)
conf.set(config.DISK_STORE_BLACKLIST_TIMEOUT.key, "10000")
val manualClock = new ManualClock(10000L)
val diskBlockManager = new DiskBlockManager(conf, true, manualClock)

// Get file succeed when no disk turns bad
val file1 = diskBlockManager.getFile(blockId1)
assert(file1 != null)
writeToFile(file1, 10)
assert(Files.readAllBytes(file1.toPath).length === 10)

// Change writable/readable of badDiskDir to simulate disk broken
diskBlockManager.localDirs.filter(_.getParentFile == badDiskDir)
.foreach(setPermissionRecursively(_, false, false, false))

// Get new file succeed when single disk is broken
try {
val file2 = diskBlockManager.getFile(blockId2)
val rootDirOfFile2 = file2.getParentFile.getParentFile.getParentFile
assert(file2 != null && file2.getParentFile.exists() && rootDirOfFile2 === goodDiskDir)
if (diskBlockManager.blacklistedDirs.nonEmpty) {
assert(diskBlockManager.blacklistedDirs.size === 1)
assert(diskBlockManager.blacklistedDirs.exists(_.getParentFile === badDiskDir))
assert(diskBlockManager.dirToBlacklistExpiryTime.size === 1)
assert(diskBlockManager.dirToBlacklistExpiryTime.exists { case (f, expireTime) =>
f.getParentFile === badDiskDir && expireTime === 20000
})
// If migrated to new good directory, a new file would be provided
assert(!file2.exists())
}

// Get file returns the same result after blacklisting.
// If file is in bad directory, then reading would fail, otherwise, reading returns
// exactly the same result as before.
val file3 = diskBlockManager.getFile(blockId1)
assert(file3 === file1)
if (file1.getParentFile.getParentFile.getParentFile === goodDiskDir) {
assert(Files.readAllBytes(file3.toPath).length === 10)
} else {
intercept[IOException](Files.readAllBytes(file3.toPath))
}
} finally {
diskBlockManager.localDirs.filter(_.getParentFile == badDiskDir)
.foreach(setPermissionRecursively(_, true, true, true))
}

manualClock.advance(10000)

// Update blacklist when getting file for new block
// Bad disk directory is fixed here, so blacklist should be empty
assert(diskBlockManager.getFile(blockId3) != null)
assert(diskBlockManager.blacklistedDirs.isEmpty)
assert(diskBlockManager.dirToBlacklistExpiryTime.isEmpty)
diskBlockManager.stop()
}
}
}
14 changes: 14 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,20 @@ Apart from these, the following properties are also available, and may be useful
</tr>
</table>

### Disk Management

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td><code>spark.diskStore.blacklist.timeoutMs</code></td>
<td>1d</td>
<td>
How long that a bad local directory is blacklisted for the diskStore before is is unconditionally removed
from the blacklist to attempt storing new file.
</td>
</tr>
</table>

### Execution Behavior

<table class="table">
Expand Down