Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,21 @@ public class YarnShuffleService extends AuxiliaryService {
private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;

private static final String RECOVERY_FILE_NAME = "registeredExecutor.ldb";

// An entity that manages the shuffle secret per application
// This is used only if authentication is enabled
private ShuffleSecretManager secretManager;

// The actual server that serves shuffle files
private TransportServer shuffleServer = null;

private Configuration _conf = null;

// The recovery path used to shuffle service recovery
@VisibleForTesting
Path _recoveryPath = null;

// Handles registering executors and opening shuffle blocks
@VisibleForTesting
ExternalShuffleBlockHandler blockHandler;
Expand Down Expand Up @@ -112,14 +120,15 @@ private boolean isAuthenticationEnabled() {
*/
@Override
protected void serviceInit(Configuration conf) {
_conf = conf;

// In case this NM was killed while there were running spark applications, we need to restore
// lost state for the existing executors. We look for an existing file in the NM's local dirs.
// If we don't find one, then we choose a file to use to save the state next time. Even if
// an application was stopped while the NM was down, we expect yarn to call stopApplication()
// when it comes back
registeredExecutorFile =
findRegisteredExecutorFile(conf.getTrimmedStrings("yarn.nodemanager.local-dirs"));
new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);

TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
// If authentication is enabled, set up the shuffle server to use a
Expand Down Expand Up @@ -190,16 +199,6 @@ public void stopContainer(ContainerTerminationContext context) {
logger.info("Stopping container {}", containerId);
}

private File findRegisteredExecutorFile(String[] localDirs) {
for (String dir: localDirs) {
File f = new File(new Path(dir).toUri().getPath(), "registeredExecutors.ldb");
if (f.exists()) {
return f;
}
}
return new File(new Path(localDirs[0]).toUri().getPath(), "registeredExecutors.ldb");
}

/**
* Close the shuffle server to clean up any associated state.
*/
Expand All @@ -222,4 +221,47 @@ protected void serviceStop() {
public ByteBuffer getMetaData() {
return ByteBuffer.allocate(0);
}

/**
* Set the recovery path for shuffle service recovery when NM is restarted. The method will be
* overrode and called when Hadoop version is 2.5+ and NM recovery is enabled, otherwise we
* have to manually call this to set our own recovery path.
*/
public void setRecoveryPath(Path recoveryPath) {
_recoveryPath = recoveryPath;
}

/**
* Get the recovery path, this will override the default one to get our own maintained
* recovery path.
*/
protected Path getRecoveryPath() {
String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
for (String dir : localDirs) {
File f = new File(new Path(dir).toUri().getPath(), RECOVERY_FILE_NAME);
if (f.exists()) {
if (_recoveryPath == null) {
// If NM recovery is not enabled, we should specify the recovery path using NM local
// dirs, which is compatible with the old code.
_recoveryPath = new Path(dir);
} else {
// If NM recovery is enabled and the recovery file exists in old NM local dirs, which
// means old version of Spark already generated the recovery file, we should copy the
// old file in to a new recovery path for the compatibility.
if (!f.renameTo(new File(_recoveryPath.toUri().getPath(), RECOVERY_FILE_NAME))) {
// Fail to move recovery file to new path
logger.error("Failed to move recovery file {} to the path {}",
RECOVERY_FILE_NAME, _recoveryPath.toString());
}
}
break;
}
}

if (_recoveryPath == null) {
_recoveryPath = new Path(localDirs[0]);
}

return _recoveryPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@ package org.apache.spark.network.yarn
import java.io.{DataOutputStream, File, FileOutputStream}

import scala.annotation.tailrec
import scala.concurrent.duration._

import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.api.records.ApplicationId
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, ApplicationTerminationContext}
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts

import org.apache.spark.SparkFunSuite
import org.apache.spark.network.shuffle.ShuffleTestAccessor
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.util.Utils

class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration
Expand All @@ -40,15 +44,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
classOf[YarnShuffleService].getCanonicalName)
yarnConfig.setInt("spark.shuffle.service.port", 0)

yarnConfig.get("yarn.nodemanager.local-dirs").split(",").foreach { dir =>
val d = new File(dir)
if (d.exists()) {
FileUtils.deleteDirectory(d)
}
FileUtils.forceMkdir(d)
logInfo(s"creating yarn.nodemanager.local-dirs: $d")
}
val localDir = Utils.createTempDir()
yarnConfig.set("yarn.nodemanager.local-dirs", localDir.getAbsolutePath)
}

var s1: YarnShuffleService = null
Expand Down Expand Up @@ -234,7 +231,89 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s3.initializeApplication(app2Data)
ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver3) should be (Some(shuffleInfo2))
s3.stop()
}

test("get correct recovery path") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also add a test to make sure the move happens properly when upgrading.

// Test recovery path is set outside the shuffle service, this is to simulate NM recovery
// enabled scenario, where recovery path will be set by yarn.
s1 = new YarnShuffleService
val recoveryPath = new Path(Utils.createTempDir().toURI)
s1.setRecoveryPath(recoveryPath)

s1.init(yarnConfig)
s1._recoveryPath should be (recoveryPath)
s1.stop()

// Test recovery path is set inside the shuffle service, this will be happened when NM
// recovery is not enabled or there's no NM recovery (Hadoop 2.5-).
s2 = new YarnShuffleService
s2.init(yarnConfig)
s2._recoveryPath should be
(new Path(yarnConfig.getTrimmedStrings("yarn.nodemanager.local-dirs")(0)))
s2.stop()
}

}
test("moving recovery file form NM local dir to recovery path") {
// This is to test when Hadoop is upgrade to 2.5+ and NM recovery is enabled, we should move
// old recovery file to the new path to keep compatibility

// Simulate s1 is running on old version of Hadoop in which recovery file is in the NM local
// dir.
s1 = new YarnShuffleService
s1.init(yarnConfig)
val app1Id = ApplicationId.newInstance(0, 1)
val app1Data: ApplicationInitializationContext =
new ApplicationInitializationContext("user", app1Id, null)
s1.initializeApplication(app1Data)
val app2Id = ApplicationId.newInstance(0, 2)
val app2Data: ApplicationInitializationContext =
new ApplicationInitializationContext("user", app2Id, null)
s1.initializeApplication(app2Data)

val execStateFile = s1.registeredExecutorFile
execStateFile should not be (null)
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)

val blockHandler = s1.blockHandler
val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
ShuffleTestAccessor.registeredExecutorFile(blockResolver) should be (execStateFile)

blockResolver.registerExecutor(app1Id.toString, "exec-1", shuffleInfo1)
blockResolver.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should
be (Some(shuffleInfo1))
ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", blockResolver) should
be (Some(shuffleInfo2))

assert(execStateFile.exists(), s"$execStateFile did not exist")

s1.stop()

// Simulate s2 is running on Hadoop 2.5+ with NM recovery is enabled.
assert(execStateFile.exists())
val recoveryPath = new Path(Utils.createTempDir().toURI)
s2 = new YarnShuffleService
s2.setRecoveryPath(recoveryPath)
s2.init(yarnConfig)

val execStateFile2 = s2.registeredExecutorFile
recoveryPath.toString should be (new Path(execStateFile2.getParentFile.toURI).toString)
eventually(timeout(10 seconds), interval(5 millis)) {
assert(!execStateFile.exists())
}

val handler2 = s2.blockHandler
val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)

// now we reinitialize only one of the apps, and expect yarn to tell us that app2 was stopped
// during the restart
// Since recovery file is got from old path, so the previous state should be stored.
s2.initializeApplication(app1Data)
s2.stopApplication(new ApplicationTerminationContext(app2Id))
ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should be (Some(shuffleInfo1))
ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be (None)

s2.stop()
}
}