From c8cdde56b7f2a93bc06202fb63bc58bb4a3326b8 Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Tue, 22 Jan 2019 19:40:15 +0800 Subject: [PATCH 1/9] Handle bad disk in DiskBlockManager --- .../spark/internal/config/package.scala | 10 +++ .../spark/storage/DiskBlockManager.scala | 62 +++++++++++++------ .../spark/storage/DiskBlockManagerSuite.scala | 20 ++++++ 3 files changed, 74 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 71b0df404668..34d4829de369 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -284,6 +284,16 @@ package object config { .intConf .createWithDefault(64) + private[spark] val DISK_STORE_BLACKLIST_TIMEOUT = + ConfigBuilder("spark.diskStore.blacklist.timeout") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("1d") + + private[spark] val DISK_STORE_MAX_RETIRES = + ConfigBuilder("spark.diskStore.maxRetries") + .intConf + .createWithDefault(3) + private[spark] val BLOCK_FAILURES_BEFORE_LOCATION_REFRESH = ConfigBuilder("spark.block.failures.beforeLocationRefresh") .doc("Max number of failures before this block manager refreshes " + diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 95ce4b0f09f5..a1bc00e1e811 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -20,6 +20,8 @@ package org.apache.spark.storage import java.io.{File, IOException} import java.util.UUID +import scala.collection.mutable.{ArrayBuffer, HashMap} + import org.apache.spark.SparkConf import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.internal.{config, Logging} @@ -48,33 +50,57 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea // of subDirs(i) is protected by the lock of subDirs(i) private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) + private val badDirs = ArrayBuffer[File]() + private val maxRetries = conf.get(config.DISK_STORE_MAX_RETIRES) + private val blacklistTimeout = conf.get(config.DISK_STORE_BLACKLIST_TIMEOUT) + private val dirToBlacklistExpiryTime = new HashMap[File, Long] + private val shutdownHook = addShutdownHook() /** Looks up a file by hashing it into one of our local subdirectories. */ // This method should be kept in sync with // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile(). def getFile(filename: String): File = { - // Figure out which local directory it hashes to, and which subdirectory in that - val hash = Utils.nonNegativeHash(filename) - val dirId = hash % localDirs.length - val subDirId = (hash / localDirs.length) % subDirsPerLocalDir - - // Create the subdirectory if it doesn't already exist - val subDir = subDirs(dirId).synchronized { - val old = subDirs(dirId)(subDirId) - if (old != null) { - old - } else { - val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) - if (!newDir.exists() && !newDir.mkdir()) { - throw new IOException(s"Failed to create local dir in $newDir.") + var mostRecentFailure: Exception = null + // Update blacklist + val now = System.currentTimeMillis() + badDirs.dropWhile(now > dirToBlacklistExpiryTime(_)) + + for (attempt <- 0 until maxRetries) { + val goodDirs = localDirs.filterNot(badDirs.contains(_)) + if (goodDirs.isEmpty) { + throw new IOException("No good disk directories available") + } + // Figure out which local directory it hashes to, and which subdirectory in that + val hash = Utils.nonNegativeHash(filename) + val dirId = hash % goodDirs.length + val subDirId = (hash / goodDirs.length) % subDirsPerLocalDir + try { + // Create the subdirectory if it doesn't already exist + val subDir = subDirs(dirId).synchronized { + val old = subDirs(dirId)(subDirId) + if (old != null) { + old + } else { + val newDir = new File(goodDirs(dirId), "%02x".format(subDirId)) + if (!newDir.exists() && !newDir.mkdir()) { + throw new IOException(s"Failed to create local dir in $newDir.") + } + subDirs(dirId)(subDirId) = newDir + newDir + } } - subDirs(dirId)(subDirId) = newDir - newDir + return new File(subDir, filename) + } catch { + case e: IOException => + logError(s"Failed to looking up file $filename in attempt $attempt", e) + badDirs += goodDirs(dirId) + dirToBlacklistExpiryTime.put(goodDirs(dirId), + System.currentTimeMillis() + blacklistTimeout) + mostRecentFailure = e } } - - new File(subDir, filename) + throw mostRecentFailure } def getFile(blockId: BlockId): File = getFile(blockId.name) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 0c4f3c48ef80..32a1b72873ab 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -91,4 +91,24 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B for (i <- 0 until numBytes) writer.write(i) writer.close() } + + test("test blacklisting bad disk directory") { + val blockId = new TestBlockId("1") + val hash = Utils.nonNegativeHash(blockId) + val (badDiskDir, goodDiskDir) = if (hash % rootDirs.length == 0) { + (rootDir0, rootDir1) + } else { + (rootDir1, rootDir0) + } + + // Delete dirs to simulate disk error + Utils.deleteRecursively(badDiskDir) + try { + val file = diskBlockManager.getFile(blockId) + val fileRootDir = file.getParentFile.getParentFile.getParentFile + assert(file != null && file.getParentFile.exists() && fileRootDir === goodDiskDir) + } finally { + badDiskDir.mkdirs() + } + } } From 713c6dae06e1b813ff4548dde8272ef3b3e8b979 Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Wed, 30 Jan 2019 19:46:19 +0800 Subject: [PATCH 2/9] Fix --- .../apache/spark/storage/DiskBlockManager.scala | 17 +++++++++++++---- .../spark/storage/DiskBlockManagerSuite.scala | 4 ++-- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index a1bc00e1e811..ac76acfcaba8 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -19,10 +19,12 @@ package org.apache.spark.storage import java.io.{File, IOException} import java.util.UUID +import javax.annotation.concurrent.ThreadSafe import scala.collection.mutable.{ArrayBuffer, HashMap} import org.apache.spark.SparkConf + import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.internal.{config, Logging} import org.apache.spark.util.{ShutdownHookManager, Utils} @@ -64,7 +66,13 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea var mostRecentFailure: Exception = null // Update blacklist val now = System.currentTimeMillis() - badDirs.dropWhile(now > dirToBlacklistExpiryTime(_)) + val unblacklisted = badDirs.filter(now > dirToBlacklistExpiryTime(_)) + badDirs.synchronized { + unblacklisted.foreach { dir => + badDirs -= dir + dirToBlacklistExpiryTime.remove(dir) + } + } for (attempt <- 0 until maxRetries) { val goodDirs = localDirs.filterNot(badDirs.contains(_)) @@ -94,9 +102,10 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea } catch { case e: IOException => logError(s"Failed to looking up file $filename in attempt $attempt", e) - badDirs += goodDirs(dirId) - dirToBlacklistExpiryTime.put(goodDirs(dirId), - System.currentTimeMillis() + blacklistTimeout) + badDirs.synchronized { + badDirs += goodDirs(dirId) + dirToBlacklistExpiryTime.put(goodDirs(dirId), now + blacklistTimeout) + } mostRecentFailure = e } } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 32a1b72873ab..de030bfbf515 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -94,8 +94,8 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B test("test blacklisting bad disk directory") { val blockId = new TestBlockId("1") - val hash = Utils.nonNegativeHash(blockId) - val (badDiskDir, goodDiskDir) = if (hash % rootDirs.length == 0) { + val hash = Utils.nonNegativeHash(blockId.name) + val (badDiskDir, goodDiskDir) = if (hash % 2 == 0) { (rootDir0, rootDir1) } else { (rootDir1, rootDir0) From 5a51be5b962956f6c7735227b626c04c3df9ea96 Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Wed, 30 Jan 2019 19:50:03 +0800 Subject: [PATCH 3/9] Fix imports --- .../main/scala/org/apache/spark/storage/DiskBlockManager.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index ac76acfcaba8..622333795676 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -19,12 +19,10 @@ package org.apache.spark.storage import java.io.{File, IOException} import java.util.UUID -import javax.annotation.concurrent.ThreadSafe import scala.collection.mutable.{ArrayBuffer, HashMap} import org.apache.spark.SparkConf - import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.internal.{config, Logging} import org.apache.spark.util.{ShutdownHookManager, Utils} From f6a71b5e85213f2c337d37437c9b49a720cff690 Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Thu, 31 Jan 2019 19:12:43 +0800 Subject: [PATCH 4/9] fix --- .../spark/internal/config/package.scala | 4 +- .../spark/storage/DiskBlockManager.scala | 92 +++++++++++-------- .../spark/storage/DiskBlockManagerSuite.scala | 74 +++++++++++---- 3 files changed, 110 insertions(+), 60 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 34d4829de369..1b7d914f7cf9 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -285,8 +285,8 @@ package object config { .createWithDefault(64) private[spark] val DISK_STORE_BLACKLIST_TIMEOUT = - ConfigBuilder("spark.diskStore.blacklist.timeout") - .timeConf(TimeUnit.SECONDS) + ConfigBuilder("spark.diskStore.blacklist.timeoutMs") + .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1d") private[spark] val DISK_STORE_MAX_RETIRES = diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 622333795676..1ed371ccb4d1 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -19,13 +19,15 @@ package org.apache.spark.storage import java.io.{File, IOException} import java.util.UUID +import java.util.concurrent.ConcurrentHashMap +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap} import org.apache.spark.SparkConf import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.internal.{config, Logging} -import org.apache.spark.util.{ShutdownHookManager, Utils} +import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils} /** * Creates and maintains the logical mapping between logical blocks and physical on-disk @@ -34,9 +36,12 @@ import org.apache.spark.util.{ShutdownHookManager, Utils} * Block files are hashed among the directories listed in spark.local.dir (or in * SPARK_LOCAL_DIRS, if it's set). */ -private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging { +private[spark] class DiskBlockManager(conf: SparkConf, + deleteFilesOnStop: Boolean, clock: Clock = new SystemClock()) extends Logging { private[spark] val subDirsPerLocalDir = conf.get(config.DISKSTORE_SUB_DIRECTORIES) + private[spark] val maxRetries = conf.get(config.DISK_STORE_MAX_RETIRES) + private[spark] val blacklistTimeout = conf.get(config.DISK_STORE_BLACKLIST_TIMEOUT) /* Create one local directory for each path mentioned in spark.local.dir; then, inside this * directory, create multiple subdirectories that we will hash files into, in order to avoid @@ -50,10 +55,10 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea // of subDirs(i) is protected by the lock of subDirs(i) private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) - private val badDirs = ArrayBuffer[File]() - private val maxRetries = conf.get(config.DISK_STORE_MAX_RETIRES) - private val blacklistTimeout = conf.get(config.DISK_STORE_BLACKLIST_TIMEOUT) - private val dirToBlacklistExpiryTime = new HashMap[File, Long] + private[spark] val badDirs = ArrayBuffer[File]() + private[spark] val dirToBlacklistExpiryTime = new HashMap[File, Long] + // Filename hash to dirId, it should be small enough to put into memory + private[spark] val migratedDirIdIndex = new ConcurrentHashMap[Int, Int].asScala private val shutdownHook = addShutdownHook() @@ -62,52 +67,61 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile(). def getFile(filename: String): File = { var mostRecentFailure: Exception = null - // Update blacklist - val now = System.currentTimeMillis() - val unblacklisted = badDirs.filter(now > dirToBlacklistExpiryTime(_)) - badDirs.synchronized { + // Figure out which local directory it hashes to, and which subdirectory in that + val hash = Utils.nonNegativeHash(filename) + val dirId = migratedDirIdIndex.getOrElse(hash, hash % localDirs.length) + val subDirId = (hash / localDirs.length) % subDirsPerLocalDir + + // Create the subdirectory if it doesn't already exist + val subDir = subDirs(dirId).synchronized { + // Update blacklist + val now = clock.getTimeMillis() + val unblacklisted = badDirs.filter(now >= dirToBlacklistExpiryTime(_)) unblacklisted.foreach { dir => badDirs -= dir dirToBlacklistExpiryTime.remove(dir) } - } - for (attempt <- 0 until maxRetries) { - val goodDirs = localDirs.filterNot(badDirs.contains(_)) - if (goodDirs.isEmpty) { - throw new IOException("No good disk directories available") - } - // Figure out which local directory it hashes to, and which subdirectory in that - val hash = Utils.nonNegativeHash(filename) - val dirId = hash % goodDirs.length - val subDirId = (hash / goodDirs.length) % subDirsPerLocalDir - try { - // Create the subdirectory if it doesn't already exist - val subDir = subDirs(dirId).synchronized { - val old = subDirs(dirId)(subDirId) - if (old != null) { - old + val old = subDirs(dirId)(subDirId) + if (old != null) { + old + } else { + assert(!migratedDirIdIndex.contains(dirId)) + var succeed = false + var newDir: File = null + for (attempt <- 0 until maxRetries if !succeed) { + val isBlacklisted = badDirs.contains(localDirs(dirId)) + val goodDirId = if (isBlacklisted) { + localDirs.indexWhere(!badDirs.contains(_)) } else { - val newDir = new File(goodDirs(dirId), "%02x".format(subDirId)) + dirId + } + try { + if (goodDirId < 0) { + throw new IOException("No good disk directories available") + } + newDir = new File(localDirs(goodDirId), "%02x".format(subDirId)) if (!newDir.exists() && !newDir.mkdir()) { throw new IOException(s"Failed to create local dir in $newDir.") } - subDirs(dirId)(subDirId) = newDir - newDir + subDirs(goodDirId)(subDirId) = newDir + if (goodDirId != dirId) { + migratedDirIdIndex.put(hash, goodDirId) + } + succeed = true + } catch { + case e: IOException => + logError(s"Failed to looking up file $filename in attempt $attempt", e) + badDirs += localDirs(dirId) + dirToBlacklistExpiryTime.put(localDirs(dirId), now + blacklistTimeout) + mostRecentFailure = e } + Option(newDir).getOrElse(throw mostRecentFailure) } - return new File(subDir, filename) - } catch { - case e: IOException => - logError(s"Failed to looking up file $filename in attempt $attempt", e) - badDirs.synchronized { - badDirs += goodDirs(dirId) - dirToBlacklistExpiryTime.put(goodDirs(dirId), now + blacklistTimeout) - } - mostRecentFailure = e + newDir } } - throw mostRecentFailure + new File(subDir, filename) } def getFile(blockId: BlockId): File = getFile(blockId.name) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index de030bfbf515..e3d5361aecd5 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -21,9 +21,10 @@ import java.io.{File, FileWriter} import java.util.UUID import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} - import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.util.Utils + +import org.apache.spark.internal.config +import org.apache.spark.util.{ManualClock, Utils} class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with BeforeAndAfterAll { private val testConf = new SparkConf(false) @@ -92,23 +93,58 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B writer.close() } - test("test blacklisting bad disk directory") { - val blockId = new TestBlockId("1") - val hash = Utils.nonNegativeHash(blockId.name) - val (badDiskDir, goodDiskDir) = if (hash % 2 == 0) { - (rootDir0, rootDir1) - } else { - (rootDir1, rootDir0) - } - - // Delete dirs to simulate disk error - Utils.deleteRecursively(badDiskDir) - try { - val file = diskBlockManager.getFile(blockId) - val fileRootDir = file.getParentFile.getParentFile.getParentFile - assert(file != null && file.getParentFile.exists() && fileRootDir === goodDiskDir) - } finally { - badDiskDir.mkdirs() + test(s"test blacklisting bad disk directory") { + for ((badDiskDir, goodDiskDir) <- Seq((rootDir0, rootDir1), (rootDir1, rootDir0))) { + val blockId1 = TestBlockId("1") + val blockId2 = TestBlockId("2") + val blockId3 = TestBlockId("3") + + val conf = testConf.clone + conf.set("spark.local.dir", rootDirs) + conf.set(config.DISK_STORE_BLACKLIST_TIMEOUT.key, "10000") + val manualClock = new ManualClock(10000L) + val diskBlockManager = new DiskBlockManager(conf, true, manualClock) + + // Get file succeed when no disk turns bad + val file1 = diskBlockManager.getFile(blockId1) + assert(file1 != null) + + // Delete badDiskDir to simulate disk broken + Utils.deleteRecursively(badDiskDir) + + // Get new file succeed when single disk is broken + try { + val file2 = diskBlockManager.getFile(blockId2) + val rootDirOfFile2 = file2.getParentFile.getParentFile.getParentFile + assert(file2 != null && file2.getParentFile.exists() && rootDirOfFile2 === goodDiskDir) + if (diskBlockManager.badDirs.nonEmpty) { + assert(diskBlockManager.badDirs.size === 1) + assert(diskBlockManager.badDirs.exists(_.getParentFile === badDiskDir)) + assert(diskBlockManager.dirToBlacklistExpiryTime.size === 1) + assert(diskBlockManager.dirToBlacklistExpiryTime.exists { case (f, expireTime) => + f.getParentFile === badDiskDir && expireTime === 20000 + }) + } + + // Get file succeed after bad disk blacklisted + val file3 = diskBlockManager.getFile(blockId1) + assert(file1 === file3) + + val file4 = diskBlockManager.getFile(blockId2) + val rootDirOfFile4 = file4.getParentFile.getParentFile.getParentFile + assert(file4 != null && file4.getParentFile.exists() && rootDirOfFile4 === goodDiskDir) + } finally { + diskBlockManager.localDirs.foreach(_.mkdirs()) + } + + manualClock.advance(10000) + + // Update blacklist when getting file for new block + // Bad disk directory is fixed here, so blacklist should be empty + assert(diskBlockManager.getFile(blockId3) != null) + assert(diskBlockManager.badDirs.isEmpty) + assert(diskBlockManager.dirToBlacklistExpiryTime.isEmpty) + diskBlockManager.stop() } } } From 25b13616f8cdef87b1308e3b6b5c9716de2fa357 Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Fri, 1 Feb 2019 10:12:50 +0800 Subject: [PATCH 5/9] minor fix --- .../main/scala/org/apache/spark/storage/DiskBlockManager.scala | 3 +-- .../scala/org/apache/spark/storage/DiskBlockManagerSuite.scala | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 1ed371ccb4d1..d2293ecb7fa6 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -116,9 +116,8 @@ private[spark] class DiskBlockManager(conf: SparkConf, dirToBlacklistExpiryTime.put(localDirs(dirId), now + blacklistTimeout) mostRecentFailure = e } - Option(newDir).getOrElse(throw mostRecentFailure) } - newDir + Option(newDir).getOrElse(throw mostRecentFailure) } } new File(subDir, filename) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index e3d5361aecd5..78ceed593d2f 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -18,11 +18,10 @@ package org.apache.spark.storage import java.io.{File, FileWriter} -import java.util.UUID import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.config import org.apache.spark.util.{ManualClock, Utils} From da9b44287259ac28636a0b040a6e2c0fdd82cab3 Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Sat, 2 Feb 2019 18:22:17 +0800 Subject: [PATCH 6/9] Repalce file.mkdir with Files.createDirectories to expose low-level exceptions --- .../scala/org/apache/spark/storage/DiskBlockManager.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index d2293ecb7fa6..1cba85ff279a 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -18,6 +18,7 @@ package org.apache.spark.storage import java.io.{File, IOException} +import java.nio.file.Files import java.util.UUID import java.util.concurrent.ConcurrentHashMap @@ -101,9 +102,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, throw new IOException("No good disk directories available") } newDir = new File(localDirs(goodDirId), "%02x".format(subDirId)) - if (!newDir.exists() && !newDir.mkdir()) { - throw new IOException(s"Failed to create local dir in $newDir.") - } + Files.createDirectories(newDir.toPath) subDirs(goodDirId)(subDirId) = newDir if (goodDirId != dirId) { migratedDirIdIndex.put(hash, goodDirId) @@ -111,7 +110,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, succeed = true } catch { case e: IOException => - logError(s"Failed to looking up file $filename in attempt $attempt", e) + logError(s"Failed to look up file $filename in attempt $attempt", e) badDirs += localDirs(dirId) dirToBlacklistExpiryTime.put(localDirs(dirId), now + blacklistTimeout) mostRecentFailure = e From 53270bed7aabd669c2f929d7fe9dcd0c5597f062 Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Wed, 6 Feb 2019 22:54:45 +0800 Subject: [PATCH 7/9] Update as commented --- .../spark/storage/DiskBlockManager.scala | 43 +++++++++-------- .../spark/storage/DiskBlockManagerSuite.scala | 48 +++++++++++++++---- 2 files changed, 62 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 1cba85ff279a..41fbe9fb70d7 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -58,8 +58,8 @@ private[spark] class DiskBlockManager(conf: SparkConf, private[spark] val badDirs = ArrayBuffer[File]() private[spark] val dirToBlacklistExpiryTime = new HashMap[File, Long] - // Filename hash to dirId, it should be small enough to put into memory - private[spark] val migratedDirIdIndex = new ConcurrentHashMap[Int, Int].asScala + // Filename to dirId, it should be small enough to put into memory + private[spark] val migratedDirIdIndex = new ConcurrentHashMap[String, Int].asScala private val shutdownHook = addShutdownHook() @@ -70,32 +70,35 @@ private[spark] class DiskBlockManager(conf: SparkConf, var mostRecentFailure: Exception = null // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) - val dirId = migratedDirIdIndex.getOrElse(hash, hash % localDirs.length) + val dirId = migratedDirIdIndex.getOrElse(filename, hash % localDirs.length) val subDirId = (hash / localDirs.length) % subDirsPerLocalDir // Create the subdirectory if it doesn't already exist val subDir = subDirs(dirId).synchronized { // Update blacklist val now = clock.getTimeMillis() - val unblacklisted = badDirs.filter(now >= dirToBlacklistExpiryTime(_)) - unblacklisted.foreach { dir => - badDirs -= dir - dirToBlacklistExpiryTime.remove(dir) + badDirs.synchronized { + val unblacklisted = badDirs.filter(now >= dirToBlacklistExpiryTime(_)) + unblacklisted.foreach { dir => + badDirs -= dir + dirToBlacklistExpiryTime.remove(dir) + } } val old = subDirs(dirId)(subDirId) if (old != null) { old } else { - assert(!migratedDirIdIndex.contains(dirId)) - var succeed = false + assert(!migratedDirIdIndex.contains(filename)) var newDir: File = null - for (attempt <- 0 until maxRetries if !succeed) { - val isBlacklisted = badDirs.contains(localDirs(dirId)) - val goodDirId = if (isBlacklisted) { - localDirs.indexWhere(!badDirs.contains(_)) - } else { - dirId + for (attempt <- 0 until maxRetries if newDir == null) { + val goodDirId = badDirs.synchronized { + val isBlacklisted = badDirs.contains(localDirs(dirId)) + if (isBlacklisted) { + localDirs.indexWhere(!badDirs.contains(_)) + } else { + dirId + } } try { if (goodDirId < 0) { @@ -105,15 +108,17 @@ private[spark] class DiskBlockManager(conf: SparkConf, Files.createDirectories(newDir.toPath) subDirs(goodDirId)(subDirId) = newDir if (goodDirId != dirId) { - migratedDirIdIndex.put(hash, goodDirId) + migratedDirIdIndex.put(filename, goodDirId) } - succeed = true } catch { case e: IOException => logError(s"Failed to look up file $filename in attempt $attempt", e) - badDirs += localDirs(dirId) - dirToBlacklistExpiryTime.put(localDirs(dirId), now + blacklistTimeout) + badDirs.synchronized { + badDirs += localDirs(dirId) + dirToBlacklistExpiryTime.put(localDirs(dirId), now + blacklistTimeout) + } mostRecentFailure = e + newDir = null } } Option(newDir).getOrElse(throw mostRecentFailure) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 78ceed593d2f..36693ee7b1b7 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark.storage -import java.io.{File, FileWriter} +import java.io.{File, FileWriter, IOException} +import java.nio.file.Files import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} @@ -92,6 +93,24 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B writer.close() } + def setPermissionRecursively(root: File, + writable: Boolean, readable: Boolean, tailRecursion: Boolean): Unit = { + if (root.isDirectory) { + if (tailRecursion) { + root.setWritable(writable) + root.setReadable(readable) + root.listFiles().foreach(setPermissionRecursively(_, writable, readable, tailRecursion)) + } else { + root.listFiles().foreach(setPermissionRecursively(_, writable, readable, tailRecursion)) + root.setWritable(writable) + root.setReadable(readable) + } + } else { + root.setWritable(writable) + root.setReadable(readable) + } + } + test(s"test blacklisting bad disk directory") { for ((badDiskDir, goodDiskDir) <- Seq((rootDir0, rootDir1), (rootDir1, rootDir0))) { val blockId1 = TestBlockId("1") @@ -107,9 +126,12 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B // Get file succeed when no disk turns bad val file1 = diskBlockManager.getFile(blockId1) assert(file1 != null) + writeToFile(file1, 10) + assert(Files.readAllBytes(file1.toPath).length === 10) - // Delete badDiskDir to simulate disk broken - Utils.deleteRecursively(badDiskDir) + // Change writable/readable of badDiskDir to simulate disk broken + diskBlockManager.localDirs.filter(_.getParentFile == badDiskDir) + .foreach(setPermissionRecursively(_, false, false, false)) // Get new file succeed when single disk is broken try { @@ -123,17 +145,23 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B assert(diskBlockManager.dirToBlacklistExpiryTime.exists { case (f, expireTime) => f.getParentFile === badDiskDir && expireTime === 20000 }) + // If migrated to new good directory, a new file would be provided + assert(!file2.exists()) } - // Get file succeed after bad disk blacklisted + // Get file returns the same result after blacklisting. + // If file is in bad directory, then reading would fail, otherwise, reading returns + // exactly the same result as before. val file3 = diskBlockManager.getFile(blockId1) - assert(file1 === file3) - - val file4 = diskBlockManager.getFile(blockId2) - val rootDirOfFile4 = file4.getParentFile.getParentFile.getParentFile - assert(file4 != null && file4.getParentFile.exists() && rootDirOfFile4 === goodDiskDir) + assert(file3 === file1) + if (file1.getParentFile.getParentFile.getParentFile === goodDiskDir) { + assert(Files.readAllBytes(file3.toPath).length === 10) + } else { + intercept[IOException](Files.readAllBytes(file3.toPath)) + } } finally { - diskBlockManager.localDirs.foreach(_.mkdirs()) + diskBlockManager.localDirs.filter(_.getParentFile == badDiskDir) + .foreach(setPermissionRecursively(_, true, true, true)) } manualClock.advance(10000) From c8d330498b92079945a45f302b86ed1b27bbab17 Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Tue, 12 Feb 2019 21:34:44 +0800 Subject: [PATCH 8/9] document and reword the diskStore related configs --- .../scala/org/apache/spark/internal/config/package.scala | 6 ++++-- .../scala/org/apache/spark/storage/DiskBlockManager.scala | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 1b7d914f7cf9..3bc39d010860 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -286,11 +286,13 @@ package object config { private[spark] val DISK_STORE_BLACKLIST_TIMEOUT = ConfigBuilder("spark.diskStore.blacklist.timeoutMs") + .doc("The timeout in milliseconds to wait before moving blacklisted local directory " + + "out from the blacklist.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1d") - private[spark] val DISK_STORE_MAX_RETIRES = - ConfigBuilder("spark.diskStore.maxRetries") + private[spark] val DISK_STORE_MAX_ATTEMPTS = + ConfigBuilder("spark.diskStore.maxAttempts") .intConf .createWithDefault(3) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 41fbe9fb70d7..513ceb5ef3f0 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -41,7 +41,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean, clock: Clock = new SystemClock()) extends Logging { private[spark] val subDirsPerLocalDir = conf.get(config.DISKSTORE_SUB_DIRECTORIES) - private[spark] val maxRetries = conf.get(config.DISK_STORE_MAX_RETIRES) + private[spark] val maxAttempts = conf.get(config.DISK_STORE_MAX_ATTEMPTS) private[spark] val blacklistTimeout = conf.get(config.DISK_STORE_BLACKLIST_TIMEOUT) /* Create one local directory for each path mentioned in spark.local.dir; then, inside this @@ -91,7 +91,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, } else { assert(!migratedDirIdIndex.contains(filename)) var newDir: File = null - for (attempt <- 0 until maxRetries if newDir == null) { + for (attempt <- 0 until maxAttempts if newDir == null) { val goodDirId = badDirs.synchronized { val isBlacklisted = badDirs.contains(localDirs(dirId)) if (isBlacklisted) { From 31b0b9c81c1b81aa4c9360cf11f941d0d443b065 Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Wed, 13 Feb 2019 19:49:13 +0800 Subject: [PATCH 9/9] Update docs and reword vars,comments,logs --- .../spark/storage/DiskBlockManager.scala | 25 +++++++++++-------- .../spark/storage/DiskBlockManagerSuite.scala | 8 +++--- docs/configuration.md | 14 +++++++++++ 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 513ceb5ef3f0..44ea367b0664 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -56,9 +56,10 @@ private[spark] class DiskBlockManager(conf: SparkConf, // of subDirs(i) is protected by the lock of subDirs(i) private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) - private[spark] val badDirs = ArrayBuffer[File]() + private[spark] val blacklistedDirs = ArrayBuffer[File]() private[spark] val dirToBlacklistExpiryTime = new HashMap[File, Long] - // Filename to dirId, it should be small enough to put into memory + // Migrating to a new good local directory to store the file when a bad one found. + // it should be small enough to put into memory private[spark] val migratedDirIdIndex = new ConcurrentHashMap[String, Int].asScala private val shutdownHook = addShutdownHook() @@ -77,10 +78,10 @@ private[spark] class DiskBlockManager(conf: SparkConf, val subDir = subDirs(dirId).synchronized { // Update blacklist val now = clock.getTimeMillis() - badDirs.synchronized { - val unblacklisted = badDirs.filter(now >= dirToBlacklistExpiryTime(_)) + blacklistedDirs.synchronized { + val unblacklisted = blacklistedDirs.filter(now >= dirToBlacklistExpiryTime(_)) unblacklisted.foreach { dir => - badDirs -= dir + blacklistedDirs -= dir dirToBlacklistExpiryTime.remove(dir) } } @@ -92,17 +93,19 @@ private[spark] class DiskBlockManager(conf: SparkConf, assert(!migratedDirIdIndex.contains(filename)) var newDir: File = null for (attempt <- 0 until maxAttempts if newDir == null) { - val goodDirId = badDirs.synchronized { - val isBlacklisted = badDirs.contains(localDirs(dirId)) + val goodDirId = blacklistedDirs.synchronized { + val isBlacklisted = blacklistedDirs.contains(localDirs(dirId)) if (isBlacklisted) { - localDirs.indexWhere(!badDirs.contains(_)) + localDirs.indexWhere(!blacklistedDirs.contains(_)) } else { dirId } } try { if (goodDirId < 0) { - throw new IOException("No good disk directories available") + throw new IOException(s"Cannot store file $filename anywhere due to " + + s"${blacklistedDirs.length} of ${localDirs.length} local directories " + + s"is blacklisted.") } newDir = new File(localDirs(goodDirId), "%02x".format(subDirId)) Files.createDirectories(newDir.toPath) @@ -113,8 +116,8 @@ private[spark] class DiskBlockManager(conf: SparkConf, } catch { case e: IOException => logError(s"Failed to look up file $filename in attempt $attempt", e) - badDirs.synchronized { - badDirs += localDirs(dirId) + blacklistedDirs.synchronized { + blacklistedDirs += localDirs(dirId) dirToBlacklistExpiryTime.put(localDirs(dirId), now + blacklistTimeout) } mostRecentFailure = e diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 36693ee7b1b7..b82d7c2ab9c5 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -138,9 +138,9 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B val file2 = diskBlockManager.getFile(blockId2) val rootDirOfFile2 = file2.getParentFile.getParentFile.getParentFile assert(file2 != null && file2.getParentFile.exists() && rootDirOfFile2 === goodDiskDir) - if (diskBlockManager.badDirs.nonEmpty) { - assert(diskBlockManager.badDirs.size === 1) - assert(diskBlockManager.badDirs.exists(_.getParentFile === badDiskDir)) + if (diskBlockManager.blacklistedDirs.nonEmpty) { + assert(diskBlockManager.blacklistedDirs.size === 1) + assert(diskBlockManager.blacklistedDirs.exists(_.getParentFile === badDiskDir)) assert(diskBlockManager.dirToBlacklistExpiryTime.size === 1) assert(diskBlockManager.dirToBlacklistExpiryTime.exists { case (f, expireTime) => f.getParentFile === badDiskDir && expireTime === 20000 @@ -169,7 +169,7 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B // Update blacklist when getting file for new block // Bad disk directory is fixed here, so blacklist should be empty assert(diskBlockManager.getFile(blockId3) != null) - assert(diskBlockManager.badDirs.isEmpty) + assert(diskBlockManager.blacklistedDirs.isEmpty) assert(diskBlockManager.dirToBlacklistExpiryTime.isEmpty) diskBlockManager.stop() } diff --git a/docs/configuration.md b/docs/configuration.md index 7d3bbf93ae96..a05e9368989c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1249,6 +1249,20 @@ Apart from these, the following properties are also available, and may be useful +### Disk Management + + + + + + + + +
Property NameDefaultMeaning
spark.diskStore.blacklist.timeoutMs1d + How long that a bad local directory is blacklisted for the diskStore before is is unconditionally removed + from the blacklist to attempt storing new file. +
+ ### Execution Behavior