Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package org.apache.spark.storage
import java.io.{File, IOException}
import java.util.UUID

import scala.util.control.NonFatal

import org.apache.spark.SparkConf
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.internal.{config, Logging}
Expand Down Expand Up @@ -119,38 +117,20 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea

/** Produces a unique block id and File suitable for storing local intermediate results. */
def createTempLocalBlock(): (TempLocalBlockId, File) = {
var blockId = TempLocalBlockId(UUID.randomUUID())
var tempLocalFile = getFile(blockId)
var count = 0
while (!canCreateFile(tempLocalFile) && count < Utils.MAX_DIR_CREATION_ATTEMPTS) {
blockId = TempLocalBlockId(UUID.randomUUID())
tempLocalFile = getFile(blockId)
count += 1
var blockId = new TempLocalBlockId(UUID.randomUUID())
while (getFile(blockId).exists()) {
blockId = new TempLocalBlockId(UUID.randomUUID())
}
(blockId, tempLocalFile)
(blockId, getFile(blockId))
}

/** Produces a unique block id and File suitable for storing shuffled intermediate results. */
def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
var blockId = TempShuffleBlockId(UUID.randomUUID())
var tempShuffleFile = getFile(blockId)
var count = 0
while (!canCreateFile(tempShuffleFile) && count < Utils.MAX_DIR_CREATION_ATTEMPTS) {
blockId = TempShuffleBlockId(UUID.randomUUID())
tempShuffleFile = getFile(blockId)
count += 1
}
(blockId, tempShuffleFile)
}

private def canCreateFile(file: File): Boolean = {
try {
file.createNewFile()
} catch {
case NonFatal(_) =>
logError("Failed to create temporary block file: " + file.getAbsoluteFile)
false
var blockId = new TempShuffleBlockId(UUID.randomUUID())
while (getFile(blockId).exists()) {
blockId = new TempShuffleBlockId(UUID.randomUUID())
}
(blockId, getFile(blockId))
}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private[spark] object Utils extends Logging {
*/
val DEFAULT_DRIVER_MEM_MB = JavaUtils.DEFAULT_DRIVER_MEM_MB.toInt

val MAX_DIR_CREATION_ATTEMPTS: Int = 10
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null

/** Scheme used for files that are locally available on worker nodes in the cluster. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
override def beforeEach(): Unit = {
super.beforeEach()
val conf = testConf.clone
conf.set("spark.local.dir", rootDirs).set("spark.diskStore.subDirectories", "1")
conf.set("spark.local.dir", rootDirs)
diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true)
}

Expand Down Expand Up @@ -90,45 +90,4 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
for (i <- 0 until numBytes) writer.write(i)
writer.close()
}

test("temporary shuffle/local file should be able to handle disk failures") {
try {
// the following two lines pre-create subdirectories under each root dir of block manager
diskBlockManager.getFile("1")
diskBlockManager.getFile("2")

val tempShuffleFile1 = diskBlockManager.createTempShuffleBlock()._2
val tempLocalFile1 = diskBlockManager.createTempLocalBlock()._2
assert(tempShuffleFile1.exists(), "There are no bad disks, so temp shuffle file exists")
assert(tempLocalFile1.exists(), "There are no bad disks, so temp local file exists")

// partial disks damaged
rootDir0.setExecutable(false)
val tempShuffleFile2 = diskBlockManager.createTempShuffleBlock()._2
val tempLocalFile2 = diskBlockManager.createTempLocalBlock()._2
// It's possible that after 10 retries we still not able to find the healthy disk. we need to
// remove the flakiness of these two asserts
if (tempShuffleFile2.getParentFile.getParentFile.getParent === rootDir1.getAbsolutePath) {
assert(tempShuffleFile2.exists(),
"There is only one bad disk, so temp shuffle file should be created")
}
if (tempLocalFile2.getParentFile.getParentFile.getParent === rootDir1.getAbsolutePath) {
assert(tempLocalFile2.exists(),
"There is only one bad disk, so temp local file should be created")
}

// all disks damaged
rootDir1.setExecutable(false)
val tempShuffleFile3 = diskBlockManager.createTempShuffleBlock()._2
val tempLocalFile3 = diskBlockManager.createTempLocalBlock()._2
assert(!tempShuffleFile3.exists(),
"All disks are broken, so there should be no temp shuffle file created")
assert(!tempLocalFile3.exists(),
"All disks are broken, so there should be no temp local file created")
} finally {
rootDir0.setExecutable(true)
rootDir1.setExecutable(true)
}

}
}