diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index b25e48a164e6..3c61bb1af5f2 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import com.codahale.metrics.Gauge; @@ -60,10 +61,17 @@ public class ExternalShuffleBlockHandler extends RpcHandler { private final OneForOneStreamManager streamManager; private final ShuffleMetrics metrics; - public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) + public ExternalShuffleBlockHandler( + TransportConf conf, File registeredExecutorFile) + throws IOException { + this(conf, registeredExecutorFile, null, -1L); + } + + public ExternalShuffleBlockHandler( + TransportConf conf, File registeredExecutorFile, String[] recoveryDirs, long checkInterval) throws IOException { this(new OneForOneStreamManager(), - new ExternalShuffleBlockResolver(conf, registeredExecutorFile)); + new ExternalShuffleBlockResolver(conf, registeredExecutorFile, recoveryDirs, checkInterval)); } /** Enables mocking out the StreamManager and BlockManager. */ diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 0b7a27402369..2675270d5ff6 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -24,6 +24,9 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -32,6 +35,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -86,15 +90,38 @@ public class ExternalShuffleBlockResolver { private final TransportConf conf; + private AtomicBoolean registerExecutorFailed = new AtomicBoolean(false); + + private String[] recoveryDirs; + @VisibleForTesting - final File registeredExecutorFile; + File registeredExecutorFile; @VisibleForTesting - final DB db; + DB db; private final List knownManagers = Arrays.asList( "org.apache.spark.shuffle.sort.SortShuffleManager", "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager"); + public ExternalShuffleBlockResolver( + TransportConf conf, + File registeredExecutorFile, + String[] recoveryDirs, + long checkInterval) throws IOException { + this(conf, registeredExecutorFile, + Executors.newSingleThreadExecutor( + NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner"))); + this.recoveryDirs = recoveryDirs; + if (this.recoveryDirs != null) { + Preconditions.checkArgument(checkInterval > 0, + "Check interval of multiple directories recovery should be a positive number"); + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + NettyUtils.createThreadFactory("spark-shuffle-disk-checker")); + executor.scheduleAtFixedRate(this::checkRegisteredExecutorFileAndRecover, + checkInterval, checkInterval, TimeUnit.MILLISECONDS); + } + } + public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile) throws IOException { this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor( @@ -138,6 +165,39 @@ public int getRegisteredExecutorsSize() { return executors.size(); } + public File saveNewRegisteredExecutorFile() { + String recoveryFileName = registeredExecutorFile.getName(); + for (String path : recoveryDirs) { + File newRegisteredExecutorFile = new File(path, recoveryFileName); + if (!newRegisteredExecutorFile.exists()) { + try { + DB newDb = LevelDBProvider.initLevelDB(newRegisteredExecutorFile, CURRENT_VERSION, mapper); + for (Map.Entry entry : executors.entrySet()) { + byte[] key = dbAppExecKey(entry.getKey()); + byte[] value = mapper.writeValueAsString(entry.getValue()).getBytes(StandardCharsets.UTF_8); + newDb.put(key, value); + } + // closing old db and clean up old registeredExecutorFile + if (this.db != null) { + try { + this.db.close(); + JavaUtils.deleteRecursively(registeredExecutorFile); + } catch (IOException e) { + logger.warn("Failed to clean up old registered executors file at " + registeredExecutorFile, e); + } + } + this.db = newDb; + this.registeredExecutorFile = newRegisteredExecutorFile; + return newRegisteredExecutorFile; + } catch (Exception e) { + logger.error("Exception occurred while saving registered executors info to new file " + newRegisteredExecutorFile, e); + // continue + } + } + } + return null; + } + /** Registers a new Executor with all the configuration we need to find its shuffle files. */ public void registerExecutor( String appId, @@ -157,6 +217,7 @@ public void registerExecutor( } } catch (Exception e) { logger.error("Error saving registered executors", e); + registerExecutorFailed.getAndSet(true); } executors.put(fullId, executorInfo); } @@ -274,6 +335,22 @@ public boolean accept(File dir, String name) { } } + /** + * Check if the registeredExecutorFile is unhealthy, and try to recover. In the recovery, + * It will create a new registeredExecutorFile and save the latest executors info. + */ + private void checkRegisteredExecutorFileAndRecover() { + if (this.registeredExecutorFile == null) { + return; + } else if (!registeredExecutorFile.exists() || + !registeredExecutorFile.canRead() || + !registeredExecutorFile.canWrite()) { + saveNewRegisteredExecutorFile(); + } else if (registerExecutorFailed.getAndSet(false)) { + saveNewRegisteredExecutorFile(); + } + } + /** * Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver, diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 7e8d3b2bc3ba..9167db313c2a 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.List; import java.util.Map; @@ -38,7 +39,9 @@ import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.*; +import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.LevelDBProvider; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBIterator; @@ -84,7 +87,8 @@ public class YarnShuffleService extends AuxiliaryService { private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate"; private static final boolean DEFAULT_SPARK_AUTHENTICATE = false; - private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb"; + @VisibleForTesting + static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb"; private static final String SECRETS_RECOVERY_FILE_NAME = "sparkShuffleRecovery.ldb"; // Whether failure during service initialization should stop the NM. @@ -92,6 +96,19 @@ public class YarnShuffleService extends AuxiliaryService { static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure"; private static final boolean DEFAULT_STOP_ON_FAILURE = false; + // Whether enable multiple directories recovery on registered executors info, + // If enabled, both _recoveryPath and yarn.nodemanager.local-dirs are used for + // recovery, and they are supposed to be mounted on multiple disks. + @VisibleForTesting + static final String REGISTERED_EXECUTORS_MULTIPLE_DIRECTORIES_RECOVERY = + "spark.yarn.shuffle.registeredExecutors.multipleDirsRecovery"; + private static final boolean DEFAULT_REGISTERED_EXECUTORS_MULTIPLE_DIRECTORIES_RECOVERY = false; + // Disk check interval in milliseconds + @VisibleForTesting + static final String REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL = + "spark.yarn.shuffle.registeredExecutorsFile.checkInterval"; + private static final long DEFAULT_REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL = 60 * 1000; + // just for testing when you want to find an open port @VisibleForTesting static int boundPort = -1; @@ -157,18 +174,38 @@ protected void serviceInit(Configuration conf) throws Exception { boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE); + // Whether should check multiple directories for recovery + boolean multiDirsRecovery = conf.getBoolean(REGISTERED_EXECUTORS_MULTIPLE_DIRECTORIES_RECOVERY, + DEFAULT_REGISTERED_EXECUTORS_MULTIPLE_DIRECTORIES_RECOVERY); + try { // In case this NM was killed while there were running spark applications, we need to restore // lost state for the existing executors. We look for an existing file in the NM's local dirs. // If we don't find one, then we choose a file to use to save the state next time. Even if // an application was stopped while the NM was down, we expect yarn to call stopApplication() // when it comes back + TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); + // Directories for recovery + String[] recoveryDirs = null; + // The interval for checking registered executor file and failing over if it's broken. + // Can only be used with multiple directories recovery + long checkInterval = -1; + if (_recoveryPath != null) { - registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME); + if (multiDirsRecovery) { + List recoveryDirList = Lists.newArrayList(); + recoveryDirList.add(_recoveryPath.toUri().getPath()); + Collection localDirs = conf.getTrimmedStringCollection(YarnConfiguration.NM_LOCAL_DIRS); + recoveryDirList.addAll(localDirs); + recoveryDirs = recoveryDirList.toArray(new String[0]); + registeredExecutorFile = findRegisteredExecutorFile(recoveryDirs, RECOVERY_FILE_NAME); + checkInterval = conf.getLong(REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL, + DEFAULT_REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL); + } else { + registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME); + } } - - TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); - blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); + blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile, recoveryDirs, checkInterval); // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests @@ -395,6 +432,40 @@ protected File initRecoveryDb(String dbName) { return new File(_recoveryPath.toUri().getPath(), dbName); } + public static File findRegisteredExecutorFile( + String[] recoveryPaths, String recoveryFileName) { + File newFile = null; + if (recoveryPaths != null) { + File latestFile = null; + for (String path : recoveryPaths) { + File registeredExecutorFile = new File(path, recoveryFileName); + if (registeredExecutorFile.exists()) { + if (registeredExecutorFile.canRead() + && registeredExecutorFile.canWrite()) { + if (latestFile == null || registeredExecutorFile.lastModified() > latestFile.lastModified()) { + latestFile = registeredExecutorFile; + } + } + } else { + if (newFile == null) { + try { + JavaUtils.deleteRecursively(registeredExecutorFile); + newFile = registeredExecutorFile; + } catch (IOException e) { + logger.warn("Failed to delete old registered executor file " + registeredExecutorFile, e); + } + } + } + } + if (latestFile == null) { + return newFile; + } + return latestFile; + } + return null; + } + + /** * Simply encodes an application ID. */ diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala index 1fed2562fcad..7882cdc189b1 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala @@ -44,6 +44,11 @@ object ShuffleTestAccessor { Option(resolver.executors.get(id)) } + def getAllExecutorInfos(resolver: ExternalShuffleBlockResolver): + ConcurrentMap[ExternalShuffleBlockResolver.AppExecId, ExecutorShuffleInfo] = { + resolver.executors + } + def registeredExecutorFile(resolver: ExternalShuffleBlockResolver): File = { resolver.registeredExecutorFile } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 268f4bd13f6c..584e3314055a 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -381,4 +381,76 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd s1.secretsFile should be (null) } + test("shuffle service should be robust when disk error if multiple disk recovery enabled") { + yarnConfig.set(YarnShuffleService.REGISTERED_EXECUTORS_MULTIPLE_DIRECTORIES_RECOVERY, "true") + yarnConfig.set(YarnShuffleService.REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL, "500") + s1 = new YarnShuffleService + val recoveryPath = new Path(recoveryLocalDir.toURI.getPath) + s1.setRecoveryPath(recoveryPath) + s1.init(yarnConfig) + + val appId1 = ApplicationId.newInstance(0, 1); + val app1 = makeAppInfo("user1", appId1) + val appId2 = ApplicationId.newInstance(0, 2) + val app2 = makeAppInfo("user1", appId2) + + val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER) + val shuffleInfo2 = new ExecutorShuffleInfo(Array("/foo2", "/bar2"), 3, SORT_MANAGER) + val shuffleInfo3 = new ExecutorShuffleInfo(Array("/foo3", "/bar3"), 3, SORT_MANAGER) + + val blockResolver = ShuffleTestAccessor.getBlockResolver(s1.blockHandler) + val execStateFile1 = ShuffleTestAccessor.registeredExecutorFile(blockResolver) + assert(execStateFile1.getParentFile === recoveryLocalDir) + + s1.initializeApplication(app1) + blockResolver.registerExecutor(appId1.toString, "exec1", shuffleInfo1) + val getExecInfo1 = ShuffleTestAccessor.getExecutorInfo(appId1, "exec1", blockResolver) + getExecInfo1 should be (Some(shuffleInfo1)) + + // Simulate disk error + recoveryLocalDir.setWritable(false) + execStateFile1.setWritable(false) + Thread.sleep(2000) + try { + // Test whether the failover of registeredExecutorFile happen when encountered disk error + val execStateFile2 = ShuffleTestAccessor.registeredExecutorFile(blockResolver) + assert(execStateFile1 !== execStateFile2) + val getExecInfo2 = ShuffleTestAccessor.getExecutorInfo(appId1, "exec1", blockResolver) + getExecInfo2 should be (Some(shuffleInfo1)) + + // Assume restarts happen here. + // Test whether we can find the last available registeredExecutorFile if restarts + val recoveryDirs = Seq(recoveryPath.toUri.getPath).toArray ++ + yarnConfig.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS) + val findResult = YarnShuffleService.findRegisteredExecutorFile( + recoveryDirs, YarnShuffleService.RECOVERY_FILE_NAME) + assert(findResult === execStateFile2) + + // Test whether registeredExecutors reloaded from leveldb is the same with the one + // before restarts + val currentDB = ShuffleTestAccessor.shuffleServiceLevelDB(blockResolver) + val reloadExecutors1 = ShuffleTestAccessor.reloadRegisteredExecutors(currentDB) + reloadExecutors1 should be (ShuffleTestAccessor.getAllExecutorInfos(blockResolver)) + + blockResolver.registerExecutor(appId1.toString, "exec2", shuffleInfo2) + val getExecInfo3 = ShuffleTestAccessor.getExecutorInfo(appId1, "exec2", blockResolver) + getExecInfo3 should be(Some(shuffleInfo2)) + val reloadExecutors2 = ShuffleTestAccessor.reloadRegisteredExecutors(currentDB) + reloadExecutors2 should be (ShuffleTestAccessor.getAllExecutorInfos(blockResolver)) + + s1.initializeApplication(app2) + blockResolver.registerExecutor(appId2.toString, "exec1", shuffleInfo3) + val getExecInfo4 = ShuffleTestAccessor.getExecutorInfo(appId2, "exec1", blockResolver) + getExecInfo4 should be(Some(shuffleInfo3)) + val reloadExecutors3 = ShuffleTestAccessor.reloadRegisteredExecutors(currentDB) + reloadExecutors3 should be (ShuffleTestAccessor.getAllExecutorInfos(blockResolver)) + + } finally { + s1.stopApplication(new ApplicationTerminationContext(appId1)) + s1.stopApplication(new ApplicationTerminationContext(appId2)) + s1.stop() + recoveryLocalDir.setWritable(true) + execStateFile1.setWritable(true) + } + } }