diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 71b0df404668..3bc39d010860 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -284,6 +284,18 @@ package object config { .intConf .createWithDefault(64) + private[spark] val DISK_STORE_BLACKLIST_TIMEOUT = + ConfigBuilder("spark.diskStore.blacklist.timeoutMs") + .doc("The timeout in milliseconds to wait before moving blacklisted local directory " + + "out from the blacklist.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1d") + + private[spark] val DISK_STORE_MAX_ATTEMPTS = + ConfigBuilder("spark.diskStore.maxAttempts") + .intConf + .createWithDefault(3) + private[spark] val BLOCK_FAILURES_BEFORE_LOCATION_REFRESH = ConfigBuilder("spark.block.failures.beforeLocationRefresh") .doc("Max number of failures before this block manager refreshes " + diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 95ce4b0f09f5..44ea367b0664 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -18,12 +18,17 @@ package org.apache.spark.storage import java.io.{File, IOException} +import java.nio.file.Files import java.util.UUID +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, HashMap} import org.apache.spark.SparkConf import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.internal.{config, Logging} -import org.apache.spark.util.{ShutdownHookManager, Utils} +import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils} /** * Creates and maintains the logical mapping between logical blocks and physical on-disk @@ -32,9 +37,12 @@ import org.apache.spark.util.{ShutdownHookManager, Utils} * Block files are hashed among the directories listed in spark.local.dir (or in * SPARK_LOCAL_DIRS, if it's set). */ -private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging { +private[spark] class DiskBlockManager(conf: SparkConf, + deleteFilesOnStop: Boolean, clock: Clock = new SystemClock()) extends Logging { private[spark] val subDirsPerLocalDir = conf.get(config.DISKSTORE_SUB_DIRECTORIES) + private[spark] val maxAttempts = conf.get(config.DISK_STORE_MAX_ATTEMPTS) + private[spark] val blacklistTimeout = conf.get(config.DISK_STORE_BLACKLIST_TIMEOUT) /* Create one local directory for each path mentioned in spark.local.dir; then, inside this * directory, create multiple subdirectories that we will hash files into, in order to avoid @@ -48,32 +56,77 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea // of subDirs(i) is protected by the lock of subDirs(i) private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) + private[spark] val blacklistedDirs = ArrayBuffer[File]() + private[spark] val dirToBlacklistExpiryTime = new HashMap[File, Long] + // Migrating to a new good local directory to store the file when a bad one found. + // it should be small enough to put into memory + private[spark] val migratedDirIdIndex = new ConcurrentHashMap[String, Int].asScala + private val shutdownHook = addShutdownHook() /** Looks up a file by hashing it into one of our local subdirectories. */ // This method should be kept in sync with // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile(). def getFile(filename: String): File = { + var mostRecentFailure: Exception = null // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) - val dirId = hash % localDirs.length + val dirId = migratedDirIdIndex.getOrElse(filename, hash % localDirs.length) val subDirId = (hash / localDirs.length) % subDirsPerLocalDir // Create the subdirectory if it doesn't already exist val subDir = subDirs(dirId).synchronized { + // Update blacklist + val now = clock.getTimeMillis() + blacklistedDirs.synchronized { + val unblacklisted = blacklistedDirs.filter(now >= dirToBlacklistExpiryTime(_)) + unblacklisted.foreach { dir => + blacklistedDirs -= dir + dirToBlacklistExpiryTime.remove(dir) + } + } + val old = subDirs(dirId)(subDirId) if (old != null) { old } else { - val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) - if (!newDir.exists() && !newDir.mkdir()) { - throw new IOException(s"Failed to create local dir in $newDir.") + assert(!migratedDirIdIndex.contains(filename)) + var newDir: File = null + for (attempt <- 0 until maxAttempts if newDir == null) { + val goodDirId = blacklistedDirs.synchronized { + val isBlacklisted = blacklistedDirs.contains(localDirs(dirId)) + if (isBlacklisted) { + localDirs.indexWhere(!blacklistedDirs.contains(_)) + } else { + dirId + } + } + try { + if (goodDirId < 0) { + throw new IOException(s"Cannot store file $filename anywhere due to " + + s"${blacklistedDirs.length} of ${localDirs.length} local directories " + + s"is blacklisted.") + } + newDir = new File(localDirs(goodDirId), "%02x".format(subDirId)) + Files.createDirectories(newDir.toPath) + subDirs(goodDirId)(subDirId) = newDir + if (goodDirId != dirId) { + migratedDirIdIndex.put(filename, goodDirId) + } + } catch { + case e: IOException => + logError(s"Failed to look up file $filename in attempt $attempt", e) + blacklistedDirs.synchronized { + blacklistedDirs += localDirs(dirId) + dirToBlacklistExpiryTime.put(localDirs(dirId), now + blacklistTimeout) + } + mostRecentFailure = e + newDir = null + } } - subDirs(dirId)(subDirId) = newDir - newDir + Option(newDir).getOrElse(throw mostRecentFailure) } } - new File(subDir, filename) } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 0c4f3c48ef80..b82d7c2ab9c5 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -17,13 +17,14 @@ package org.apache.spark.storage -import java.io.{File, FileWriter} -import java.util.UUID +import java.io.{File, FileWriter, IOException} +import java.nio.file.Files import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.util.Utils +import org.apache.spark.internal.config +import org.apache.spark.util.{ManualClock, Utils} class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with BeforeAndAfterAll { private val testConf = new SparkConf(false) @@ -91,4 +92,86 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B for (i <- 0 until numBytes) writer.write(i) writer.close() } + + def setPermissionRecursively(root: File, + writable: Boolean, readable: Boolean, tailRecursion: Boolean): Unit = { + if (root.isDirectory) { + if (tailRecursion) { + root.setWritable(writable) + root.setReadable(readable) + root.listFiles().foreach(setPermissionRecursively(_, writable, readable, tailRecursion)) + } else { + root.listFiles().foreach(setPermissionRecursively(_, writable, readable, tailRecursion)) + root.setWritable(writable) + root.setReadable(readable) + } + } else { + root.setWritable(writable) + root.setReadable(readable) + } + } + + test(s"test blacklisting bad disk directory") { + for ((badDiskDir, goodDiskDir) <- Seq((rootDir0, rootDir1), (rootDir1, rootDir0))) { + val blockId1 = TestBlockId("1") + val blockId2 = TestBlockId("2") + val blockId3 = TestBlockId("3") + + val conf = testConf.clone + conf.set("spark.local.dir", rootDirs) + conf.set(config.DISK_STORE_BLACKLIST_TIMEOUT.key, "10000") + val manualClock = new ManualClock(10000L) + val diskBlockManager = new DiskBlockManager(conf, true, manualClock) + + // Get file succeed when no disk turns bad + val file1 = diskBlockManager.getFile(blockId1) + assert(file1 != null) + writeToFile(file1, 10) + assert(Files.readAllBytes(file1.toPath).length === 10) + + // Change writable/readable of badDiskDir to simulate disk broken + diskBlockManager.localDirs.filter(_.getParentFile == badDiskDir) + .foreach(setPermissionRecursively(_, false, false, false)) + + // Get new file succeed when single disk is broken + try { + val file2 = diskBlockManager.getFile(blockId2) + val rootDirOfFile2 = file2.getParentFile.getParentFile.getParentFile + assert(file2 != null && file2.getParentFile.exists() && rootDirOfFile2 === goodDiskDir) + if (diskBlockManager.blacklistedDirs.nonEmpty) { + assert(diskBlockManager.blacklistedDirs.size === 1) + assert(diskBlockManager.blacklistedDirs.exists(_.getParentFile === badDiskDir)) + assert(diskBlockManager.dirToBlacklistExpiryTime.size === 1) + assert(diskBlockManager.dirToBlacklistExpiryTime.exists { case (f, expireTime) => + f.getParentFile === badDiskDir && expireTime === 20000 + }) + // If migrated to new good directory, a new file would be provided + assert(!file2.exists()) + } + + // Get file returns the same result after blacklisting. + // If file is in bad directory, then reading would fail, otherwise, reading returns + // exactly the same result as before. + val file3 = diskBlockManager.getFile(blockId1) + assert(file3 === file1) + if (file1.getParentFile.getParentFile.getParentFile === goodDiskDir) { + assert(Files.readAllBytes(file3.toPath).length === 10) + } else { + intercept[IOException](Files.readAllBytes(file3.toPath)) + } + } finally { + diskBlockManager.localDirs.filter(_.getParentFile == badDiskDir) + .foreach(setPermissionRecursively(_, true, true, true)) + } + + manualClock.advance(10000) + + // Update blacklist when getting file for new block + // Bad disk directory is fixed here, so blacklist should be empty + assert(diskBlockManager.getFile(blockId3) != null) + assert(diskBlockManager.blacklistedDirs.isEmpty) + assert(diskBlockManager.dirToBlacklistExpiryTime.isEmpty) + diskBlockManager.stop() + } + } } diff --git a/docs/configuration.md b/docs/configuration.md index 7d3bbf93ae96..a05e9368989c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1249,6 +1249,20 @@ Apart from these, the following properties are also available, and may be useful +### Disk Management + +
| Property Name | Default | Meaning |
|---|---|---|
spark.diskStore.blacklist.timeoutMs |
+ 1d | ++ How long that a bad local directory is blacklisted for the diskStore before is is unconditionally removed + from the blacklist to attempt storing new file. + | +