From 5abbe75072cf3f172f0b2e448941b94d72268c90 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 24 Aug 2017 11:28:48 +0800 Subject: [PATCH 1/4] Avoid writing shuffle metadata to disk if NM recovery is disabled Change-Id: Id062d71589f46052706058c151c706dae38b1e6e --- .../network/yarn/YarnShuffleService.java | 117 +++++++++--------- .../yarn/YarnShuffleIntegrationSuite.scala | 33 +++-- .../yarn/YarnShuffleServiceSuite.scala | 38 ++++-- 3 files changed, 113 insertions(+), 75 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index cd67eb28573e8..e5e35c8c83ab4 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -29,12 +29,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.*; import org.apache.spark.network.util.LevelDBProvider; import org.iq80.leveldb.DB; @@ -73,6 +75,8 @@ public class YarnShuffleService extends AuxiliaryService { private static final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class); + private static final boolean DEFAULT_NM_RECOVERY_ENABLED = false; + // Port on which the shuffle server listens for fetch requests private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port"; private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337; @@ -153,6 +157,8 @@ protected void serviceInit(Configuration conf) throws Exception { _conf = conf; boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE); + boolean recoveryEnabled = conf.getBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, + DEFAULT_NM_RECOVERY_ENABLED); try { // In case this NM was killed while there were running spark applications, we need to restore @@ -160,7 +166,9 @@ protected void serviceInit(Configuration conf) throws Exception { // If we don't find one, then we choose a file to use to save the state next time. Even if // an application was stopped while the NM was down, we expect yarn to call stopApplication() // when it comes back - registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME); + if (recoveryEnabled) { + registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME); + } TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); @@ -170,7 +178,7 @@ protected void serviceInit(Configuration conf) throws Exception { List bootstraps = Lists.newArrayList(); boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); if (authEnabled) { - createSecretManager(); + createSecretManager(recoveryEnabled); bootstraps.add(new AuthServerBootstrap(transportConf, secretManager)); } @@ -194,30 +202,33 @@ protected void serviceInit(Configuration conf) throws Exception { } } - private void createSecretManager() throws IOException { + private void createSecretManager(boolean recoveryEnabled) throws IOException { secretManager = new ShuffleSecretManager(); - secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME); - - // Make sure this is protected in case its not in the NM recovery dir - FileSystem fs = FileSystem.getLocal(_conf); - fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700)); - - db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper); - logger.info("Recovery location is: " + secretsFile.getPath()); - if (db != null) { - logger.info("Going to reload spark shuffle data"); - DBIterator itr = db.iterator(); - itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); - while (itr.hasNext()) { - Map.Entry e = itr.next(); - String key = new String(e.getKey(), StandardCharsets.UTF_8); - if (!key.startsWith(APP_CREDS_KEY_PREFIX)) { - break; + + if (recoveryEnabled) { + secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME); + + // Make sure this is protected in case its not in the NM recovery dir + FileSystem fs = FileSystem.getLocal(_conf); + fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short) 0700)); + + db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper); + logger.info("Recovery location is: " + secretsFile.getPath()); + if (db != null) { + logger.info("Going to reload spark shuffle data"); + DBIterator itr = db.iterator(); + itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); + while (itr.hasNext()) { + Map.Entry e = itr.next(); + String key = new String(e.getKey(), StandardCharsets.UTF_8); + if (!key.startsWith(APP_CREDS_KEY_PREFIX)) { + break; + } + String id = parseDbAppKey(key); + ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class); + logger.info("Reloading tokens for app: " + id); + secretManager.registerApp(id, secret); } - String id = parseDbAppKey(key); - ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class); - logger.info("Reloading tokens for app: " + id); - secretManager.registerApp(id, secret); } } } @@ -338,49 +349,41 @@ protected Path getRecoveryPath(String fileName) { * it will uses a YARN local dir. */ protected File initRecoveryDb(String dbName) { - if (_recoveryPath != null) { - File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbName); - if (recoveryFile.exists()) { - return recoveryFile; - } + Preconditions.checkNotNull(_recoveryPath, + "recovery path should not be null if NM recovery is enabled"); + + File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbName); + if (recoveryFile.exists()) { + return recoveryFile; } + // db doesn't exist in recovery path go check local dirs for it String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs"); for (String dir : localDirs) { File f = new File(new Path(dir).toUri().getPath(), dbName); if (f.exists()) { - if (_recoveryPath == null) { - // If NM recovery is not enabled, we should specify the recovery path using NM local - // dirs, which is compatible with the old code. - _recoveryPath = new Path(dir); - return f; - } else { - // If the recovery path is set then either NM recovery is enabled or another recovery - // DB has been initialized. If NM recovery is enabled and had set the recovery path - // make sure to move all DBs to the recovery path from the old NM local dirs. - // If another DB was initialized first just make sure all the DBs are in the same - // location. - Path newLoc = new Path(_recoveryPath, dbName); - Path copyFrom = new Path(f.toURI()); - if (!newLoc.equals(copyFrom)) { - logger.info("Moving " + copyFrom + " to: " + newLoc); - try { - // The move here needs to handle moving non-empty directories across NFS mounts - FileSystem fs = FileSystem.getLocal(_conf); - fs.rename(copyFrom, newLoc); - } catch (Exception e) { - // Fail to move recovery file to new path, just continue on with new DB location - logger.error("Failed to move recovery file {} to the path {}", - dbName, _recoveryPath.toString(), e); - } + // If the recovery path is set then either NM recovery is enabled or another recovery + // DB has been initialized. If NM recovery is enabled and had set the recovery path + // make sure to move all DBs to the recovery path from the old NM local dirs. + // If another DB was initialized first just make sure all the DBs are in the same + // location. + Path newLoc = new Path(_recoveryPath, dbName); + Path copyFrom = new Path(f.toURI()); + if (!newLoc.equals(copyFrom)) { + logger.info("Moving " + copyFrom + " to: " + newLoc); + try { + // The move here needs to handle moving non-empty directories across NFS mounts + FileSystem fs = FileSystem.getLocal(_conf); + fs.rename(copyFrom, newLoc); + } catch (Exception e) { + // Fail to move recovery file to new path, just continue on with new DB location + logger.error("Failed to move recovery file {} to the path {}", + dbName, _recoveryPath.toString(), e); } - return new File(newLoc.toUri().getPath()); } + return new File(newLoc.toUri().getPath()); } } - if (_recoveryPath == null) { - _recoveryPath = new Path(localDirs[0]); - } return new File(_recoveryPath.toUri().getPath(), dbName); } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index 13472f2ece184..01db796096f26 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -70,11 +70,18 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { val finalState = runSpark( false, mainClassName(YarnExternalShuffleDriver.getClass), - appArgs = Seq(result.getAbsolutePath(), registeredExecFile.getAbsolutePath), + appArgs = if (registeredExecFile != null) { + Seq(result.getAbsolutePath, registeredExecFile.getAbsolutePath) + } else { + Seq(result.getAbsolutePath) + }, extraConf = extraSparkConf() ) checkResult(finalState, result) - assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists()) + + if (registeredExecFile != null) { + assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists()) + } } } @@ -105,7 +112,7 @@ private object YarnExternalShuffleDriver extends Logging with Matchers { val WAIT_TIMEOUT_MILLIS = 10000 def main(args: Array[String]): Unit = { - if (args.length != 2) { + if (args.length > 2) { // scalastyle:off println System.err.println( s""" @@ -121,10 +128,16 @@ private object YarnExternalShuffleDriver extends Logging with Matchers { .setAppName("External Shuffle Test")) val conf = sc.getConf val status = new File(args(0)) - val registeredExecFile = new File(args(1)) + val registeredExecFile = if (args.length == 2) { + new File(args(1)) + } else { + null + } logInfo("shuffle service executor file = " + registeredExecFile) var result = "failure" - val execStateCopy = new File(registeredExecFile.getAbsolutePath + "_dup") + val execStateCopy = Option(registeredExecFile).map { file => + new File(file.getAbsolutePath + "_dup") + }.orNull try { val data = sc.parallelize(0 until 100, 10).map { x => (x % 10) -> x }.reduceByKey{ _ + _ }. collect().toSet @@ -132,11 +145,15 @@ private object YarnExternalShuffleDriver extends Logging with Matchers { data should be ((0 until 10).map{x => x -> (x * 10 + 450)}.toSet) result = "success" // only one process can open a leveldb file at a time, so we copy the files - FileUtils.copyDirectory(registeredExecFile, execStateCopy) - assert(!ShuffleTestAccessor.reloadRegisteredExecutors(execStateCopy).isEmpty) + if (registeredExecFile != null && execStateCopy != null) { + FileUtils.copyDirectory(registeredExecFile, execStateCopy) + assert(!ShuffleTestAccessor.reloadRegisteredExecutors(execStateCopy).isEmpty) + } } finally { sc.stop() - FileUtils.deleteDirectory(execStateCopy) + if (execStateCopy != null) { + FileUtils.deleteDirectory(execStateCopy) + } Files.write(result, status, StandardCharsets.UTF_8) } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index a58784f59676a..2e25bf2bbc036 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -44,6 +44,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd private[yarn] var yarnConfig: YarnConfiguration = null private[yarn] val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager" + private var recoveryLocalDir: File = _ + override def beforeEach(): Unit = { super.beforeEach() yarnConfig = new YarnConfiguration() @@ -54,6 +56,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd yarnConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true) val localDir = Utils.createTempDir() yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath) + + recoveryLocalDir = Utils.createTempDir() } var s1: YarnShuffleService = null @@ -80,7 +84,9 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd } test("executor state kept across NM restart") { + yarnConfig.set(YarnConfiguration.NM_RECOVERY_ENABLED, "true") s1 = new YarnShuffleService + s1.setRecoveryPath(new Path(recoveryLocalDir.toURI)) // set auth to true to test the secrets recovery yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true) s1.init(yarnConfig) @@ -123,6 +129,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd // now we pretend the shuffle service goes down, and comes back up s1.stop() s2 = new YarnShuffleService + s2.setRecoveryPath(new Path(recoveryLocalDir.toURI)) s2.init(yarnConfig) s2.secretsFile should be (secretsFile) s2.registeredExecutorFile should be (execStateFile) @@ -140,6 +147,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd // Act like the NM restarts one more time s2.stop() s3 = new YarnShuffleService + s3.setRecoveryPath(new Path(recoveryLocalDir.toURI)) s3.init(yarnConfig) s3.registeredExecutorFile should be (execStateFile) s3.secretsFile should be (secretsFile) @@ -155,7 +163,9 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd } test("removed applications should not be in registered executor file") { + yarnConfig.set(YarnConfiguration.NM_RECOVERY_ENABLED, "true") s1 = new YarnShuffleService + s1.setRecoveryPath(new Path(recoveryLocalDir.toURI)) yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, false) s1.init(yarnConfig) val secretsFile = s1.secretsFile @@ -189,7 +199,9 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd } test("shuffle service should be robust to corrupt registered executor file") { + yarnConfig.set(YarnConfiguration.NM_RECOVERY_ENABLED, "true") s1 = new YarnShuffleService + s1.setRecoveryPath(new Path(recoveryLocalDir.toURI)) s1.init(yarnConfig) val app1Id = ApplicationId.newInstance(0, 1) val app1Data = makeAppInfo("user", app1Id) @@ -215,6 +227,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd out.close() s2 = new YarnShuffleService + s2.setRecoveryPath(new Path(recoveryLocalDir.toURI)) s2.init(yarnConfig) s2.registeredExecutorFile should be (execStateFile) @@ -234,6 +247,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd // another stop & restart should be fine though (eg., we recover from previous corruption) s3 = new YarnShuffleService + s3.setRecoveryPath(new Path(recoveryLocalDir.toURI)) s3.init(yarnConfig) s3.registeredExecutorFile should be (execStateFile) val handler3 = s3.blockHandler @@ -247,6 +261,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd test("get correct recovery path") { // Test recovery path is set outside the shuffle service, this is to simulate NM recovery // enabled scenario, where recovery path will be set by yarn. + yarnConfig.set(YarnConfiguration.NM_RECOVERY_ENABLED, "true") s1 = new YarnShuffleService val recoveryPath = new Path(Utils.createTempDir().toURI) s1.setRecoveryPath(recoveryPath) @@ -254,14 +269,6 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd s1.init(yarnConfig) s1._recoveryPath should be (recoveryPath) s1.stop() - - // Test recovery path is set inside the shuffle service, this will be happened when NM - // recovery is not enabled or there's no NM recovery (Hadoop 2.5-). - s2 = new YarnShuffleService - s2.init(yarnConfig) - s2._recoveryPath should be - (new Path(yarnConfig.getTrimmedStrings("yarn.nodemanager.local-dirs")(0))) - s2.stop() } test("moving recovery file from NM local dir to recovery path") { @@ -270,7 +277,9 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd // Simulate s1 is running on old version of Hadoop in which recovery file is in the NM local // dir. + yarnConfig.set(YarnConfiguration.NM_RECOVERY_ENABLED, "true") s1 = new YarnShuffleService + s1.setRecoveryPath(new Path(yarnConfig.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS)(0))) // set auth to true to test the secrets recovery yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true) s1.init(yarnConfig) @@ -308,7 +317,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd // Simulate s2 is running on Hadoop 2.5+ with NM recovery is enabled. assert(execStateFile.exists()) - val recoveryPath = new Path(Utils.createTempDir().toURI) + val recoveryPath = new Path(recoveryLocalDir.toURI) s2 = new YarnShuffleService s2.setRecoveryPath(recoveryPath) s2.init(yarnConfig) @@ -347,10 +356,11 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd // Set up a read-only local dir. val roDir = Utils.createTempDir() Files.setPosixFilePermissions(roDir.toPath(), EnumSet.of(OWNER_READ, OWNER_EXECUTE)) - yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath()) + yarnConfig.set(YarnConfiguration.NM_RECOVERY_ENABLED, "true") // Try to start the shuffle service, it should fail. val service = new YarnShuffleService() + service.setRecoveryPath(new Path(roDir.toURI)) try { val error = intercept[ServiceStateException] { @@ -369,4 +379,12 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd new ApplicationInitializationContext(user, appId, secret) } + test("recovery db should not be created if NM recovery is not enabled") { + s1 = new YarnShuffleService + s1.init(yarnConfig) + s1._recoveryPath should be (null) + s1.registeredExecutorFile should be (null) + s1.secretsFile should be (null) + } + } From 185d3dd7be54226e8badc7c8aa065ce61f839860 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 28 Aug 2017 17:14:33 +0800 Subject: [PATCH 2/4] Address the comments Change-Id: I06866813d24af5cd6ae64f45df1c7a4ebaf2b12d --- .../network/yarn/YarnShuffleService.java | 61 +++++++++---------- .../yarn/YarnShuffleServiceSuite.scala | 6 -- 2 files changed, 28 insertions(+), 39 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index e5e35c8c83ab4..c6ea75c4b75cf 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -36,7 +36,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.*; import org.apache.spark.network.util.LevelDBProvider; import org.iq80.leveldb.DB; @@ -75,8 +74,6 @@ public class YarnShuffleService extends AuxiliaryService { private static final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class); - private static final boolean DEFAULT_NM_RECOVERY_ENABLED = false; - // Port on which the shuffle server listens for fetch requests private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port"; private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337; @@ -157,8 +154,6 @@ protected void serviceInit(Configuration conf) throws Exception { _conf = conf; boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE); - boolean recoveryEnabled = conf.getBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, - DEFAULT_NM_RECOVERY_ENABLED); try { // In case this NM was killed while there were running spark applications, we need to restore @@ -166,7 +161,7 @@ protected void serviceInit(Configuration conf) throws Exception { // If we don't find one, then we choose a file to use to save the state next time. Even if // an application was stopped while the NM was down, we expect yarn to call stopApplication() // when it comes back - if (recoveryEnabled) { + if (_recoveryPath != null) { registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME); } @@ -178,7 +173,10 @@ protected void serviceInit(Configuration conf) throws Exception { List bootstraps = Lists.newArrayList(); boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); if (authEnabled) { - createSecretManager(recoveryEnabled); + secretManager = new ShuffleSecretManager(); + if (_recoveryPath != null) { + loadSecretsFromDb(); + } bootstraps.add(new AuthServerBootstrap(transportConf, secretManager)); } @@ -202,33 +200,29 @@ protected void serviceInit(Configuration conf) throws Exception { } } - private void createSecretManager(boolean recoveryEnabled) throws IOException { - secretManager = new ShuffleSecretManager(); - - if (recoveryEnabled) { - secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME); - - // Make sure this is protected in case its not in the NM recovery dir - FileSystem fs = FileSystem.getLocal(_conf); - fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short) 0700)); - - db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper); - logger.info("Recovery location is: " + secretsFile.getPath()); - if (db != null) { - logger.info("Going to reload spark shuffle data"); - DBIterator itr = db.iterator(); - itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); - while (itr.hasNext()) { - Map.Entry e = itr.next(); - String key = new String(e.getKey(), StandardCharsets.UTF_8); - if (!key.startsWith(APP_CREDS_KEY_PREFIX)) { - break; - } - String id = parseDbAppKey(key); - ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class); - logger.info("Reloading tokens for app: " + id); - secretManager.registerApp(id, secret); + private void loadSecretsFromDb() throws IOException { + secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME); + + // Make sure this is protected in case its not in the NM recovery dir + FileSystem fs = FileSystem.getLocal(_conf); + fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short) 0700)); + + db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper); + logger.info("Recovery location is: " + secretsFile.getPath()); + if (db != null) { + logger.info("Going to reload spark shuffle data"); + DBIterator itr = db.iterator(); + itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); + while (itr.hasNext()) { + Map.Entry e = itr.next(); + String key = new String(e.getKey(), StandardCharsets.UTF_8); + if (!key.startsWith(APP_CREDS_KEY_PREFIX)) { + break; } + String id = parseDbAppKey(key); + ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class); + logger.info("Reloading tokens for app: " + id); + secretManager.registerApp(id, secret); } } } @@ -332,6 +326,7 @@ public ByteBuffer getMetaData() { * overrode and called when Hadoop version is 2.5+ and NM recovery is enabled, otherwise we * have to manually call this to set our own recovery path. */ + @Override public void setRecoveryPath(Path recoveryPath) { _recoveryPath = recoveryPath; } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 2e25bf2bbc036..268f4bd13f6c3 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -84,7 +84,6 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd } test("executor state kept across NM restart") { - yarnConfig.set(YarnConfiguration.NM_RECOVERY_ENABLED, "true") s1 = new YarnShuffleService s1.setRecoveryPath(new Path(recoveryLocalDir.toURI)) // set auth to true to test the secrets recovery @@ -163,7 +162,6 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd } test("removed applications should not be in registered executor file") { - yarnConfig.set(YarnConfiguration.NM_RECOVERY_ENABLED, "true") s1 = new YarnShuffleService s1.setRecoveryPath(new Path(recoveryLocalDir.toURI)) yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, false) @@ -199,7 +197,6 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd } test("shuffle service should be robust to corrupt registered executor file") { - yarnConfig.set(YarnConfiguration.NM_RECOVERY_ENABLED, "true") s1 = new YarnShuffleService s1.setRecoveryPath(new Path(recoveryLocalDir.toURI)) s1.init(yarnConfig) @@ -261,7 +258,6 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd test("get correct recovery path") { // Test recovery path is set outside the shuffle service, this is to simulate NM recovery // enabled scenario, where recovery path will be set by yarn. - yarnConfig.set(YarnConfiguration.NM_RECOVERY_ENABLED, "true") s1 = new YarnShuffleService val recoveryPath = new Path(Utils.createTempDir().toURI) s1.setRecoveryPath(recoveryPath) @@ -277,7 +273,6 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd // Simulate s1 is running on old version of Hadoop in which recovery file is in the NM local // dir. - yarnConfig.set(YarnConfiguration.NM_RECOVERY_ENABLED, "true") s1 = new YarnShuffleService s1.setRecoveryPath(new Path(yarnConfig.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS)(0))) // set auth to true to test the secrets recovery @@ -356,7 +351,6 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd // Set up a read-only local dir. val roDir = Utils.createTempDir() Files.setPosixFilePermissions(roDir.toPath(), EnumSet.of(OWNER_READ, OWNER_EXECUTE)) - yarnConfig.set(YarnConfiguration.NM_RECOVERY_ENABLED, "true") // Try to start the shuffle service, it should fail. val service = new YarnShuffleService() From 9fe7e993ace695dc1c0bcdafee6f929ad9a9a0f1 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 28 Aug 2017 21:50:14 +0800 Subject: [PATCH 3/4] Change the comment Change-Id: I90ae354f840534fad0b448392cab5713eb7c7171 --- .../java/org/apache/spark/network/yarn/YarnShuffleService.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index c6ea75c4b75cf..d978a7d8f8f73 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -340,8 +340,7 @@ protected Path getRecoveryPath(String fileName) { /** * Figure out the recovery path and handle moving the DB if YARN NM recovery gets enabled - * when it previously was not. If YARN NM recovery is enabled it uses that path, otherwise - * it will uses a YARN local dir. + * and DB exists in the local dir of NM by old version of shuffle service. */ protected File initRecoveryDb(String dbName) { Preconditions.checkNotNull(_recoveryPath, From ebe0a24150f196df52daf1304b13e15454750db7 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 31 Aug 2017 08:48:27 +0800 Subject: [PATCH 4/4] Update the comments Change-Id: I85129842468e74c3a91232991595916d50b206fc --- .../org/apache/spark/network/yarn/YarnShuffleService.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index d978a7d8f8f73..d8b2ed6b5dc7b 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -322,9 +322,8 @@ public ByteBuffer getMetaData() { } /** - * Set the recovery path for shuffle service recovery when NM is restarted. The method will be - * overrode and called when Hadoop version is 2.5+ and NM recovery is enabled, otherwise we - * have to manually call this to set our own recovery path. + * Set the recovery path for shuffle service recovery when NM is restarted. This will be call + * by NM if NM recovery is enabled. */ @Override public void setRecoveryPath(Path recoveryPath) {