diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index ee43b76e1701..f2113947f6bf 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -20,8 +20,6 @@ package org.apache.spark.storage import java.io.{File, IOException} import java.util.UUID -import scala.util.control.NonFatal - import org.apache.spark.SparkConf import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.internal.{config, Logging} @@ -119,38 +117,20 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea /** Produces a unique block id and File suitable for storing local intermediate results. */ def createTempLocalBlock(): (TempLocalBlockId, File) = { - var blockId = TempLocalBlockId(UUID.randomUUID()) - var tempLocalFile = getFile(blockId) - var count = 0 - while (!canCreateFile(tempLocalFile) && count < Utils.MAX_DIR_CREATION_ATTEMPTS) { - blockId = TempLocalBlockId(UUID.randomUUID()) - tempLocalFile = getFile(blockId) - count += 1 + var blockId = new TempLocalBlockId(UUID.randomUUID()) + while (getFile(blockId).exists()) { + blockId = new TempLocalBlockId(UUID.randomUUID()) } - (blockId, tempLocalFile) + (blockId, getFile(blockId)) } /** Produces a unique block id and File suitable for storing shuffled intermediate results. */ def createTempShuffleBlock(): (TempShuffleBlockId, File) = { - var blockId = TempShuffleBlockId(UUID.randomUUID()) - var tempShuffleFile = getFile(blockId) - var count = 0 - while (!canCreateFile(tempShuffleFile) && count < Utils.MAX_DIR_CREATION_ATTEMPTS) { - blockId = TempShuffleBlockId(UUID.randomUUID()) - tempShuffleFile = getFile(blockId) - count += 1 - } - (blockId, tempShuffleFile) - } - - private def canCreateFile(file: File): Boolean = { - try { - file.createNewFile() - } catch { - case NonFatal(_) => - logError("Failed to create temporary block file: " + file.getAbsoluteFile) - false + var blockId = new TempShuffleBlockId(UUID.randomUUID()) + while (getFile(blockId).exists()) { + blockId = new TempShuffleBlockId(UUID.randomUUID()) } + (blockId, getFile(blockId)) } /** diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 9f332ba60808..c7db2127a6f0 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -95,7 +95,7 @@ private[spark] object Utils extends Logging { */ val DEFAULT_DRIVER_MEM_MB = JavaUtils.DEFAULT_DRIVER_MEM_MB.toInt - val MAX_DIR_CREATION_ATTEMPTS: Int = 10 + private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 @volatile private var localRootDirs: Array[String] = null /** Scheme used for files that are locally available on worker nodes in the cluster. */ diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index ccc525e85483..c757dee43808 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -51,7 +51,7 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B override def beforeEach(): Unit = { super.beforeEach() val conf = testConf.clone - conf.set("spark.local.dir", rootDirs).set("spark.diskStore.subDirectories", "1") + conf.set("spark.local.dir", rootDirs) diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true) } @@ -90,45 +90,4 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B for (i <- 0 until numBytes) writer.write(i) writer.close() } - - test("temporary shuffle/local file should be able to handle disk failures") { - try { - // the following two lines pre-create subdirectories under each root dir of block manager - diskBlockManager.getFile("1") - diskBlockManager.getFile("2") - - val tempShuffleFile1 = diskBlockManager.createTempShuffleBlock()._2 - val tempLocalFile1 = diskBlockManager.createTempLocalBlock()._2 - assert(tempShuffleFile1.exists(), "There are no bad disks, so temp shuffle file exists") - assert(tempLocalFile1.exists(), "There are no bad disks, so temp local file exists") - - // partial disks damaged - rootDir0.setExecutable(false) - val tempShuffleFile2 = diskBlockManager.createTempShuffleBlock()._2 - val tempLocalFile2 = diskBlockManager.createTempLocalBlock()._2 - // It's possible that after 10 retries we still not able to find the healthy disk. we need to - // remove the flakiness of these two asserts - if (tempShuffleFile2.getParentFile.getParentFile.getParent === rootDir1.getAbsolutePath) { - assert(tempShuffleFile2.exists(), - "There is only one bad disk, so temp shuffle file should be created") - } - if (tempLocalFile2.getParentFile.getParentFile.getParent === rootDir1.getAbsolutePath) { - assert(tempLocalFile2.exists(), - "There is only one bad disk, so temp local file should be created") - } - - // all disks damaged - rootDir1.setExecutable(false) - val tempShuffleFile3 = diskBlockManager.createTempShuffleBlock()._2 - val tempLocalFile3 = diskBlockManager.createTempLocalBlock()._2 - assert(!tempShuffleFile3.exists(), - "All disks are broken, so there should be no temp shuffle file created") - assert(!tempLocalFile3.exists(), - "All disks are broken, so there should be no temp local file created") - } finally { - rootDir0.setExecutable(true) - rootDir1.setExecutable(true) - } - - } }