Skip to content

Commit aab99d3

Browse files
jerryshaoTom Graves
authored andcommitted
[SPARK-14963][YARN] Using recoveryPath if NM recovery is enabled
## What changes were proposed in this pull request? From Hadoop 2.5+, Yarn NM supports NM recovery which using recovery path for auxiliary services such as spark_shuffle, mapreduce_shuffle. So here change to use this path install of NM local dir if NM recovery is enabled. ## How was this patch tested? Unit test + local test. Author: jerryshao <[email protected]> Closes #12994 from jerryshao/SPARK-14963.
1 parent a019e6e commit aab99d3

File tree

2 files changed

+143
-22
lines changed

2 files changed

+143
-22
lines changed

common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,21 @@ public class YarnShuffleService extends AuxiliaryService {
6868
private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
6969
private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
7070

71+
private static final String RECOVERY_FILE_NAME = "registeredExecutor.ldb";
72+
7173
// An entity that manages the shuffle secret per application
7274
// This is used only if authentication is enabled
7375
private ShuffleSecretManager secretManager;
7476

7577
// The actual server that serves shuffle files
7678
private TransportServer shuffleServer = null;
7779

80+
private Configuration _conf = null;
81+
82+
// The recovery path used to shuffle service recovery
83+
@VisibleForTesting
84+
Path _recoveryPath = null;
85+
7886
// Handles registering executors and opening shuffle blocks
7987
@VisibleForTesting
8088
ExternalShuffleBlockHandler blockHandler;
@@ -112,14 +120,15 @@ private boolean isAuthenticationEnabled() {
112120
*/
113121
@Override
114122
protected void serviceInit(Configuration conf) {
123+
_conf = conf;
115124

116125
// In case this NM was killed while there were running spark applications, we need to restore
117126
// lost state for the existing executors. We look for an existing file in the NM's local dirs.
118127
// If we don't find one, then we choose a file to use to save the state next time. Even if
119128
// an application was stopped while the NM was down, we expect yarn to call stopApplication()
120129
// when it comes back
121130
registeredExecutorFile =
122-
findRegisteredExecutorFile(conf.getTrimmedStrings("yarn.nodemanager.local-dirs"));
131+
new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
123132

124133
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
125134
// If authentication is enabled, set up the shuffle server to use a
@@ -190,16 +199,6 @@ public void stopContainer(ContainerTerminationContext context) {
190199
logger.info("Stopping container {}", containerId);
191200
}
192201

193-
private File findRegisteredExecutorFile(String[] localDirs) {
194-
for (String dir: localDirs) {
195-
File f = new File(new Path(dir).toUri().getPath(), "registeredExecutors.ldb");
196-
if (f.exists()) {
197-
return f;
198-
}
199-
}
200-
return new File(new Path(localDirs[0]).toUri().getPath(), "registeredExecutors.ldb");
201-
}
202-
203202
/**
204203
* Close the shuffle server to clean up any associated state.
205204
*/
@@ -222,4 +221,47 @@ protected void serviceStop() {
222221
public ByteBuffer getMetaData() {
223222
return ByteBuffer.allocate(0);
224223
}
224+
225+
/**
226+
* Set the recovery path for shuffle service recovery when NM is restarted. The method will be
227+
* overrode and called when Hadoop version is 2.5+ and NM recovery is enabled, otherwise we
228+
* have to manually call this to set our own recovery path.
229+
*/
230+
public void setRecoveryPath(Path recoveryPath) {
231+
_recoveryPath = recoveryPath;
232+
}
233+
234+
/**
235+
* Get the recovery path, this will override the default one to get our own maintained
236+
* recovery path.
237+
*/
238+
protected Path getRecoveryPath() {
239+
String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
240+
for (String dir : localDirs) {
241+
File f = new File(new Path(dir).toUri().getPath(), RECOVERY_FILE_NAME);
242+
if (f.exists()) {
243+
if (_recoveryPath == null) {
244+
// If NM recovery is not enabled, we should specify the recovery path using NM local
245+
// dirs, which is compatible with the old code.
246+
_recoveryPath = new Path(dir);
247+
} else {
248+
// If NM recovery is enabled and the recovery file exists in old NM local dirs, which
249+
// means old version of Spark already generated the recovery file, we should copy the
250+
// old file in to a new recovery path for the compatibility.
251+
if (!f.renameTo(new File(_recoveryPath.toUri().getPath(), RECOVERY_FILE_NAME))) {
252+
// Fail to move recovery file to new path
253+
logger.error("Failed to move recovery file {} to the path {}",
254+
RECOVERY_FILE_NAME, _recoveryPath.toString());
255+
}
256+
}
257+
break;
258+
}
259+
}
260+
261+
if (_recoveryPath == null) {
262+
_recoveryPath = new Path(localDirs[0]);
263+
}
264+
265+
return _recoveryPath;
266+
}
225267
}

yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala

Lines changed: 90 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,20 @@ package org.apache.spark.network.yarn
1919
import java.io.{DataOutputStream, File, FileOutputStream}
2020

2121
import scala.annotation.tailrec
22+
import scala.concurrent.duration._
2223

23-
import org.apache.commons.io.FileUtils
24+
import org.apache.hadoop.fs.Path
2425
import org.apache.hadoop.yarn.api.records.ApplicationId
2526
import org.apache.hadoop.yarn.conf.YarnConfiguration
2627
import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, ApplicationTerminationContext}
2728
import org.scalatest.{BeforeAndAfterEach, Matchers}
29+
import org.scalatest.concurrent.Eventually._
30+
import org.scalatest.concurrent.Timeouts
2831

2932
import org.apache.spark.SparkFunSuite
3033
import org.apache.spark.network.shuffle.ShuffleTestAccessor
3134
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
35+
import org.apache.spark.util.Utils
3236

3337
class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
3438
private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration
@@ -40,15 +44,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
4044
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
4145
classOf[YarnShuffleService].getCanonicalName)
4246
yarnConfig.setInt("spark.shuffle.service.port", 0)
43-
44-
yarnConfig.get("yarn.nodemanager.local-dirs").split(",").foreach { dir =>
45-
val d = new File(dir)
46-
if (d.exists()) {
47-
FileUtils.deleteDirectory(d)
48-
}
49-
FileUtils.forceMkdir(d)
50-
logInfo(s"creating yarn.nodemanager.local-dirs: $d")
51-
}
47+
val localDir = Utils.createTempDir()
48+
yarnConfig.set("yarn.nodemanager.local-dirs", localDir.getAbsolutePath)
5249
}
5350

5451
var s1: YarnShuffleService = null
@@ -234,7 +231,89 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
234231
s3.initializeApplication(app2Data)
235232
ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver3) should be (Some(shuffleInfo2))
236233
s3.stop()
234+
}
235+
236+
test("get correct recovery path") {
237+
// Test recovery path is set outside the shuffle service, this is to simulate NM recovery
238+
// enabled scenario, where recovery path will be set by yarn.
239+
s1 = new YarnShuffleService
240+
val recoveryPath = new Path(Utils.createTempDir().toURI)
241+
s1.setRecoveryPath(recoveryPath)
242+
243+
s1.init(yarnConfig)
244+
s1._recoveryPath should be (recoveryPath)
245+
s1.stop()
237246

247+
// Test recovery path is set inside the shuffle service, this will be happened when NM
248+
// recovery is not enabled or there's no NM recovery (Hadoop 2.5-).
249+
s2 = new YarnShuffleService
250+
s2.init(yarnConfig)
251+
s2._recoveryPath should be
252+
(new Path(yarnConfig.getTrimmedStrings("yarn.nodemanager.local-dirs")(0)))
253+
s2.stop()
238254
}
239255

240-
}
256+
test("moving recovery file form NM local dir to recovery path") {
257+
// This is to test when Hadoop is upgrade to 2.5+ and NM recovery is enabled, we should move
258+
// old recovery file to the new path to keep compatibility
259+
260+
// Simulate s1 is running on old version of Hadoop in which recovery file is in the NM local
261+
// dir.
262+
s1 = new YarnShuffleService
263+
s1.init(yarnConfig)
264+
val app1Id = ApplicationId.newInstance(0, 1)
265+
val app1Data: ApplicationInitializationContext =
266+
new ApplicationInitializationContext("user", app1Id, null)
267+
s1.initializeApplication(app1Data)
268+
val app2Id = ApplicationId.newInstance(0, 2)
269+
val app2Data: ApplicationInitializationContext =
270+
new ApplicationInitializationContext("user", app2Id, null)
271+
s1.initializeApplication(app2Data)
272+
273+
val execStateFile = s1.registeredExecutorFile
274+
execStateFile should not be (null)
275+
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
276+
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
277+
278+
val blockHandler = s1.blockHandler
279+
val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
280+
ShuffleTestAccessor.registeredExecutorFile(blockResolver) should be (execStateFile)
281+
282+
blockResolver.registerExecutor(app1Id.toString, "exec-1", shuffleInfo1)
283+
blockResolver.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
284+
ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should
285+
be (Some(shuffleInfo1))
286+
ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", blockResolver) should
287+
be (Some(shuffleInfo2))
288+
289+
assert(execStateFile.exists(), s"$execStateFile did not exist")
290+
291+
s1.stop()
292+
293+
// Simulate s2 is running on Hadoop 2.5+ with NM recovery is enabled.
294+
assert(execStateFile.exists())
295+
val recoveryPath = new Path(Utils.createTempDir().toURI)
296+
s2 = new YarnShuffleService
297+
s2.setRecoveryPath(recoveryPath)
298+
s2.init(yarnConfig)
299+
300+
val execStateFile2 = s2.registeredExecutorFile
301+
recoveryPath.toString should be (new Path(execStateFile2.getParentFile.toURI).toString)
302+
eventually(timeout(10 seconds), interval(5 millis)) {
303+
assert(!execStateFile.exists())
304+
}
305+
306+
val handler2 = s2.blockHandler
307+
val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)
308+
309+
// now we reinitialize only one of the apps, and expect yarn to tell us that app2 was stopped
310+
// during the restart
311+
// Since recovery file is got from old path, so the previous state should be stored.
312+
s2.initializeApplication(app1Data)
313+
s2.stopApplication(new ApplicationTerminationContext(app2Id))
314+
ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should be (Some(shuffleInfo1))
315+
ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be (None)
316+
317+
s2.stop()
318+
}
319+
}

0 commit comments

Comments
 (0)