Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ContainerId;
Expand Down Expand Up @@ -332,28 +333,76 @@ protected Path getRecoveryPath(String fileName) {
return _recoveryPath;
}

/**
* Checks that the current running process can read, write the given file
* by using methods of the File objects.
*
* @param file File to check
* @return True if process has read, write and execute access on the path, or false.
*/
protected Boolean checkFileAccess(File file) {
if (!file.exists()) {
logger.warn("File is not existed: " + file.toString());
return false;
}

if (!FileUtil.canRead(file)) {
logger.warn("File is not readable: " + file.toString());
return false;
}

if (!FileUtil.canWrite(file)) {
logger.warn("File is not writable: " + file.toString());
return false;
}

if (!FileUtil.canExecute(file)) {
logger.warn("File is not executable: " + file.toString());
return false;
}

return true;
}

/**
* Figure out the recovery path and handle moving the DB if YARN NM recovery gets enabled
* when it previously was not. If YARN NM recovery is enabled it uses that path, otherwise
* it will uses a YARN local dir.
*/
protected File initRecoveryDb(String dbName) {
protected File initRecoveryDb(String dbName) throws IOException {
Boolean bolRecoveryPathAvailable = true;

if (_recoveryPath != null) {
File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbName);
if (recoveryFile.exists()) {

bolRecoveryPathAvailable = checkFileAccess(new File(_recoveryPath.toUri().getPath()));
logger.info("Recovery path {} ldb available: {}.", _recoveryPath, bolRecoveryPathAvailable);

if (recoveryFile.exists() && bolRecoveryPathAvailable) {
return recoveryFile;
}
}

// If recovery path unavailable, no use it any more.
if (!bolRecoveryPathAvailable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think recovery path is set by user or use yarn default, user should make sure the availability of this directory, and yarn internally relies on it. It doesn't make sense to change to another disk if recovery path is unavailable.

logger.warn("Recovery path {} unavailable: set it to null", _recoveryPath);
_recoveryPath = null;
}

// db doesn't exist in recovery path go check local dirs for it
String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
for (String dir : localDirs) {
File f = new File(new Path(dir).toUri().getPath(), dbName);
// 1. `_recoveryPath` not exists, `f` should be writable;
// 2. `_recoveryPath` exists, `newLoc` should be writable;
if (f.exists()) {
if (_recoveryPath == null) {
// If NM recovery is not enabled, we should specify the recovery path using NM local
// dirs, which is compatible with the old code.
_recoveryPath = new Path(dir);
return f;
if (checkFileAccess(f)) {
// If NM recovery is not enabled, we should specify the recovery path using NM local
// dirs, which is compatible with the old code.
_recoveryPath = new Path(dir);
return f;
}
} else {
// If the recovery path is set then either NM recovery is enabled or another recovery
// DB has been initialized. If NM recovery is enabled and had set the recovery path
Expand All @@ -378,8 +427,24 @@ protected File initRecoveryDb(String dbName) {
}
}
}

// Find a local_dir which is writable, to avoid creating ldb in a read-only disk.
if (_recoveryPath == null) {
for (String dir : localDirs) {
File f = new File(dir);
if (checkFileAccess(f)) {
_recoveryPath = new Path(dir);
break;
} else {
logger.warn("Local dir {} is not reachable.", dir);
}
}
}

if (_recoveryPath == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If _recoveryPath is still null I think we should throw an exception here, since none of the disk is good.

_recoveryPath = new Path(localDirs[0]);
throw new IOException("Failed to choose a reachable DB recovery path, " +
"please check `yarn.nodemanager.local-dirs` and `yarn.nodemanager.recovery.dir` is available " +
"in the `yarn-site.xml`.");
}

return new File(_recoveryPath.toUri().getPath(), dbName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,4 +369,48 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
new ApplicationInitializationContext(user, appId, secret)
}

test("SPARK-21660: get the correct init recovery path considering file access") {
// Test recovery path is set outside the shuffle service, but is a read-only dir,
// if `yarn.nodemanager.local-dirs` has available dir, return the available local dir.
s1 = new YarnShuffleService
val recoveryDir = Utils.createTempDir()
val recoveryPath = new Path(recoveryDir.toURI)
Files.setPosixFilePermissions(recoveryDir.toPath, EnumSet.of(OWNER_READ, OWNER_EXECUTE))
s1.setRecoveryPath(recoveryPath)

s1.init(yarnConfig)
s1._recoveryPath should be
(new Path(yarnConfig.getTrimmedStrings("yarn.nodemanager.local-dirs")(0)))
s1.stop()

// Test recovery path is set inside the shuffle service, but is a read-only dir,
// and `yarn.nodemanager.local-dirs` has no available dir, return IOException.
s2 = new YarnShuffleService
s2.setRecoveryPath(recoveryPath)

val yarnConfig2 = new YarnConfiguration()
yarnConfig2.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
yarnConfig2.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
classOf[YarnShuffleService].getCanonicalName)
yarnConfig2.setInt("spark.shuffle.service.port", 0)
yarnConfig2.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true)

val localDir2 = Utils.createTempDir()
Files.setPosixFilePermissions(localDir2.toPath, EnumSet.of(OWNER_READ, OWNER_EXECUTE))
yarnConfig2.set(YarnConfiguration.NM_LOCAL_DIRS, localDir2.getAbsolutePath)

try {
val error = intercept[ServiceStateException] {
s2.init(yarnConfig2)
}
assert(error.getCause.isInstanceOf[IOException])
} finally {
Files.setPosixFilePermissions(recoveryDir.toPath,
EnumSet.of(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE))
Files.setPosixFilePermissions(localDir2.toPath,
EnumSet.of(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE))
s2.stop()
}
}

}