Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,16 @@ package object config {
.intConf
.createWithDefault(64)

private[spark] val DISK_STORE_BLACKLIST_TIMEOUT =
ConfigBuilder("spark.diskStore.blacklist.timeout")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("1d")

private[spark] val DISK_STORE_MAX_RETIRES =
ConfigBuilder("spark.diskStore.maxRetries")
.intConf
.createWithDefault(3)

private[spark] val BLOCK_FAILURES_BEFORE_LOCATION_REFRESH =
ConfigBuilder("spark.block.failures.beforeLocationRefresh")
.doc("Max number of failures before this block manager refreshes " +
Expand Down
69 changes: 51 additions & 18 deletions core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.storage
import java.io.{File, IOException}
import java.util.UUID

import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark.SparkConf
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.internal.{config, Logging}
Expand Down Expand Up @@ -48,33 +50,64 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
// of subDirs(i) is protected by the lock of subDirs(i)
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))

private val badDirs = ArrayBuffer[File]()
private val maxRetries = conf.get(config.DISK_STORE_MAX_RETIRES)
private val blacklistTimeout = conf.get(config.DISK_STORE_BLACKLIST_TIMEOUT)
private val dirToBlacklistExpiryTime = new HashMap[File, Long]

private val shutdownHook = addShutdownHook()

/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

// Create the subdirectory if it doesn't already exist
val subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create local dir in $newDir.")
}
subDirs(dirId)(subDirId) = newDir
newDir
var mostRecentFailure: Exception = null
// Update blacklist
val now = System.currentTimeMillis()
val unblacklisted = badDirs.filter(now > dirToBlacklistExpiryTime(_))
badDirs.synchronized {
unblacklisted.foreach { dir =>
badDirs -= dir
dirToBlacklistExpiryTime.remove(dir)
}
}

new File(subDir, filename)
for (attempt <- 0 until maxRetries) {
val goodDirs = localDirs.filterNot(badDirs.contains(_))
if (goodDirs.isEmpty) {
throw new IOException("No good disk directories available")
}
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % goodDirs.length
val subDirId = (hash / goodDirs.length) % subDirsPerLocalDir
try {
// Create the subdirectory if it doesn't already exist
val subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
val newDir = new File(goodDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create local dir in $newDir.")
}
subDirs(dirId)(subDirId) = newDir
newDir
}
}
return new File(subDir, filename)
} catch {
case e: IOException =>
logError(s"Failed to looking up file $filename in attempt $attempt", e)
badDirs.synchronized {
badDirs += goodDirs(dirId)
dirToBlacklistExpiryTime.put(goodDirs(dirId), now + blacklistTimeout)
}
mostRecentFailure = e
}
}
throw mostRecentFailure
}

def getFile(blockId: BlockId): File = getFile(blockId.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,24 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
for (i <- 0 until numBytes) writer.write(i)
writer.close()
}

test("test blacklisting bad disk directory") {
val blockId = new TestBlockId("1")
val hash = Utils.nonNegativeHash(blockId.name)
val (badDiskDir, goodDiskDir) = if (hash % 2 == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the purpose of this randomisation for choosing goodDiskDir/badDiskDir from rootDirs is to test from run to run the case when not the good dir is chosen first by diskBlockManager.

I would strongly suggest to avoid this as sometimes the feature is not tested and the introduced bug could be detected at the testing of an unrelated different change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@attilapiros good suggestion, I have already update the UT and also fix a bug: when getFile for block writen before disk corruption, it will return a different result.

(rootDir0, rootDir1)
} else {
(rootDir1, rootDir0)
}

// Delete dirs to simulate disk error
Utils.deleteRecursively(badDiskDir)
try {
val file = diskBlockManager.getFile(blockId)
val fileRootDir = file.getParentFile.getParentFile.getParentFile
assert(file != null && file.getParentFile.exists() && fileRootDir === goodDiskDir)
} finally {
badDiskDir.mkdirs()
}
}
}