Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.storage
import java.io.{File, IOException}
import java.util.UUID

import scala.util.control.NonFatal

import org.apache.spark.SparkConf
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.internal.{config, Logging}
Expand Down Expand Up @@ -117,20 +119,38 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea

/** Produces a unique block id and File suitable for storing local intermediate results. */
def createTempLocalBlock(): (TempLocalBlockId, File) = {
var blockId = new TempLocalBlockId(UUID.randomUUID())
while (getFile(blockId).exists()) {
blockId = new TempLocalBlockId(UUID.randomUUID())
var blockId = TempLocalBlockId(UUID.randomUUID())
var tempLocalFile = getFile(blockId)
var count = 0
while (!canCreateFile(tempLocalFile) && count < Utils.MAX_DIR_CREATION_ATTEMPTS) {
blockId = TempLocalBlockId(UUID.randomUUID())
tempLocalFile = getFile(blockId)
count += 1
}
(blockId, getFile(blockId))
(blockId, tempLocalFile)
}

/** Produces a unique block id and File suitable for storing shuffled intermediate results. */
def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
var blockId = new TempShuffleBlockId(UUID.randomUUID())
while (getFile(blockId).exists()) {
blockId = new TempShuffleBlockId(UUID.randomUUID())
var blockId = TempShuffleBlockId(UUID.randomUUID())
var tempShuffleFile = getFile(blockId)
var count = 0
while (!canCreateFile(tempShuffleFile) && count < Utils.MAX_DIR_CREATION_ATTEMPTS) {
blockId = TempShuffleBlockId(UUID.randomUUID())
tempShuffleFile = getFile(blockId)
count += 1
}
(blockId, tempShuffleFile)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I should have mentioned this earlier -- these two functions are exactly the same except for the constructor, right? Could have been refactored. (not a big deal, and not worth a followup just for this.)

}

private def canCreateFile(file: File): Boolean = {
try {
file.createNewFile()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it OK to leave the file created? This is different from before

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really good question. It looks safe to me, I don't think any of the writers check for file existence before opening a FileOutputStream etc. I also checked IndexBlockResolver, which has some checks on pre-existing files -- but those are checks on the final destination files, not the temporary intermediate files.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is only one temp file be created or none, after or before this change

} catch {
case NonFatal(_) =>
logError("Failed to create temporary block file: " + file.getAbsoluteFile)
false
}
(blockId, getFile(blockId))
}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private[spark] object Utils extends Logging {
*/
val DEFAULT_DRIVER_MEM_MB = JavaUtils.DEFAULT_DRIVER_MEM_MB.toInt

private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
val MAX_DIR_CREATION_ATTEMPTS: Int = 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we use it before?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its used in Utils.createDirectory, which is only used in the standalone cluster

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@volatile private var localRootDirs: Array[String] = null

/** Scheme used for files that are locally available on worker nodes in the cluster. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.storage

import java.io.{File, FileWriter}
import java.util.UUID

import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

Expand Down Expand Up @@ -52,7 +51,7 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
override def beforeEach() {
super.beforeEach()
val conf = testConf.clone
conf.set("spark.local.dir", rootDirs)
conf.set("spark.local.dir", rootDirs).set("spark.diskStore.subDirectories", "1")
diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true)
}

Expand Down Expand Up @@ -91,4 +90,45 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
for (i <- 0 until numBytes) writer.write(i)
writer.close()
}

test("temporary shuffle/local file should be able to handle disk failures") {
try {
// the following two lines pre-create subdirectories under each root dir of block manager
diskBlockManager.getFile("1")
diskBlockManager.getFile("2")

val tempShuffleFile1 = diskBlockManager.createTempShuffleBlock()._2
val tempLocalFile1 = diskBlockManager.createTempLocalBlock()._2
assert(tempShuffleFile1.exists(), "There are no bad disks, so temp shuffle file exists")
assert(tempLocalFile1.exists(), "There are no bad disks, so temp local file exists")

// partial disks damaged
rootDir0.setExecutable(false)
val tempShuffleFile2 = diskBlockManager.createTempShuffleBlock()._2
val tempLocalFile2 = diskBlockManager.createTempLocalBlock()._2
// It's possible that after 10 retries we still not able to find the healthy disk. we need to
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan please check whether this is ok or not

// remove the flakiness of these two asserts
if (tempShuffleFile2.getParentFile.getParentFile.getParent === rootDir1.getAbsolutePath) {
assert(tempShuffleFile2.exists(),
"There is only one bad disk, so temp shuffle file should be created")
}
if (tempLocalFile2.getParentFile.getParentFile.getParent === rootDir1.getAbsolutePath) {
assert(tempLocalFile2.exists(),
"There is only one bad disk, so temp local file should be created")
}

// all disks damaged
rootDir1.setExecutable(false)
val tempShuffleFile3 = diskBlockManager.createTempShuffleBlock()._2
val tempLocalFile3 = diskBlockManager.createTempLocalBlock()._2
assert(!tempShuffleFile3.exists(),
"All disks are broken, so there should be no temp shuffle file created")
assert(!tempLocalFile3.exists(),
"All disks are broken, so there should be no temp local file created")
} finally {
rootDir0.setExecutable(true)
rootDir1.setExecutable(true)
}

}
}