Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import com.codahale.metrics.Gauge;
Expand Down Expand Up @@ -60,10 +61,17 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
private final OneForOneStreamManager streamManager;
private final ShuffleMetrics metrics;

public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile)
public ExternalShuffleBlockHandler(
TransportConf conf, File registeredExecutorFile)
throws IOException {
this(conf, registeredExecutorFile, null, -1L);
}

public ExternalShuffleBlockHandler(
TransportConf conf, File registeredExecutorFile, String[] recoveryDirs, long checkInterval)
throws IOException {
this(new OneForOneStreamManager(),
new ExternalShuffleBlockResolver(conf, registeredExecutorFile));
new ExternalShuffleBlockResolver(conf, registeredExecutorFile, recoveryDirs, checkInterval));
}

/** Enables mocking out the StreamManager and BlockManager. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -32,6 +35,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -86,15 +90,38 @@ public class ExternalShuffleBlockResolver {

private final TransportConf conf;

private AtomicBoolean registerExecutorFailed = new AtomicBoolean(false);

private String[] recoveryDirs;

@VisibleForTesting
final File registeredExecutorFile;
File registeredExecutorFile;
@VisibleForTesting
final DB db;
DB db;

private final List<String> knownManagers = Arrays.asList(
"org.apache.spark.shuffle.sort.SortShuffleManager",
"org.apache.spark.shuffle.unsafe.UnsafeShuffleManager");

public ExternalShuffleBlockResolver(
TransportConf conf,
File registeredExecutorFile,
String[] recoveryDirs,
long checkInterval) throws IOException {
this(conf, registeredExecutorFile,
Executors.newSingleThreadExecutor(
NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner")));
this.recoveryDirs = recoveryDirs;
if (this.recoveryDirs != null) {
Preconditions.checkArgument(checkInterval > 0,
"Check interval of multiple directories recovery should be a positive number");
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
NettyUtils.createThreadFactory("spark-shuffle-disk-checker"));
executor.scheduleAtFixedRate(this::checkRegisteredExecutorFileAndRecover,
checkInterval, checkInterval, TimeUnit.MILLISECONDS);
}
}

public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile)
throws IOException {
this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor(
Expand Down Expand Up @@ -138,6 +165,39 @@ public int getRegisteredExecutorsSize() {
return executors.size();
}

public File saveNewRegisteredExecutorFile() {
String recoveryFileName = registeredExecutorFile.getName();
for (String path : recoveryDirs) {
File newRegisteredExecutorFile = new File(path, recoveryFileName);
if (!newRegisteredExecutorFile.exists()) {
try {
DB newDb = LevelDBProvider.initLevelDB(newRegisteredExecutorFile, CURRENT_VERSION, mapper);
for (Map.Entry<AppExecId, ExecutorShuffleInfo> entry : executors.entrySet()) {
byte[] key = dbAppExecKey(entry.getKey());
byte[] value = mapper.writeValueAsString(entry.getValue()).getBytes(StandardCharsets.UTF_8);
newDb.put(key, value);
}
// closing old db and clean up old registeredExecutorFile
if (this.db != null) {
try {
this.db.close();
JavaUtils.deleteRecursively(registeredExecutorFile);
} catch (IOException e) {
logger.warn("Failed to clean up old registered executors file at " + registeredExecutorFile, e);
}
}
this.db = newDb;
this.registeredExecutorFile = newRegisteredExecutorFile;
return newRegisteredExecutorFile;
} catch (Exception e) {
logger.error("Exception occurred while saving registered executors info to new file " + newRegisteredExecutorFile, e);
// continue
}
}
}
return null;
}

/** Registers a new Executor with all the configuration we need to find its shuffle files. */
public void registerExecutor(
String appId,
Expand All @@ -157,6 +217,7 @@ public void registerExecutor(
}
} catch (Exception e) {
logger.error("Error saving registered executors", e);
registerExecutorFailed.getAndSet(true);
}
executors.put(fullId, executorInfo);
}
Expand Down Expand Up @@ -274,6 +335,22 @@ public boolean accept(File dir, String name) {
}
}

/**
* Check if the registeredExecutorFile is unhealthy, and try to recover. In the recovery,
* It will create a new registeredExecutorFile and save the latest executors info.
*/
private void checkRegisteredExecutorFileAndRecover() {
if (this.registeredExecutorFile == null) {
return;
} else if (!registeredExecutorFile.exists() ||
!registeredExecutorFile.canRead() ||
!registeredExecutorFile.canWrite()) {
saveNewRegisteredExecutorFile();
} else if (registerExecutorFailed.getAndSet(false)) {
saveNewRegisteredExecutorFile();
}
}

/**
* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
* called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;

Expand All @@ -38,7 +39,9 @@
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.*;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.LevelDBProvider;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
Expand Down Expand Up @@ -84,14 +87,28 @@ public class YarnShuffleService extends AuxiliaryService {
private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;

private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
@VisibleForTesting
static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
private static final String SECRETS_RECOVERY_FILE_NAME = "sparkShuffleRecovery.ldb";

// Whether failure during service initialization should stop the NM.
@VisibleForTesting
static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
private static final boolean DEFAULT_STOP_ON_FAILURE = false;

// Whether enable multiple directories recovery on registered executors info,
// If enabled, both _recoveryPath and yarn.nodemanager.local-dirs are used for
// recovery, and they are supposed to be mounted on multiple disks.
@VisibleForTesting
static final String REGISTERED_EXECUTORS_MULTIPLE_DIRECTORIES_RECOVERY =
"spark.yarn.shuffle.registeredExecutors.multipleDirsRecovery";
private static final boolean DEFAULT_REGISTERED_EXECUTORS_MULTIPLE_DIRECTORIES_RECOVERY = false;
// Disk check interval in milliseconds
@VisibleForTesting
static final String REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL =
"spark.yarn.shuffle.registeredExecutorsFile.checkInterval";
private static final long DEFAULT_REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL = 60 * 1000;

// just for testing when you want to find an open port
@VisibleForTesting
static int boundPort = -1;
Expand Down Expand Up @@ -157,18 +174,38 @@ protected void serviceInit(Configuration conf) throws Exception {

boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);

// Whether should check multiple directories for recovery
boolean multiDirsRecovery = conf.getBoolean(REGISTERED_EXECUTORS_MULTIPLE_DIRECTORIES_RECOVERY,
DEFAULT_REGISTERED_EXECUTORS_MULTIPLE_DIRECTORIES_RECOVERY);

try {
// In case this NM was killed while there were running spark applications, we need to restore
// lost state for the existing executors. We look for an existing file in the NM's local dirs.
// If we don't find one, then we choose a file to use to save the state next time. Even if
// an application was stopped while the NM was down, we expect yarn to call stopApplication()
// when it comes back
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
// Directories for recovery
String[] recoveryDirs = null;
// The interval for checking registered executor file and failing over if it's broken.
// Can only be used with multiple directories recovery
long checkInterval = -1;

if (_recoveryPath != null) {
registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
if (multiDirsRecovery) {
List<String> recoveryDirList = Lists.newArrayList();
recoveryDirList.add(_recoveryPath.toUri().getPath());
Collection<String> localDirs = conf.getTrimmedStringCollection(YarnConfiguration.NM_LOCAL_DIRS);
recoveryDirList.addAll(localDirs);
recoveryDirs = recoveryDirList.toArray(new String[0]);
registeredExecutorFile = findRegisteredExecutorFile(recoveryDirs, RECOVERY_FILE_NAME);
checkInterval = conf.getLong(REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL,
DEFAULT_REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL);
} else {
registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
}
}

TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile, recoveryDirs, checkInterval);

// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
Expand Down Expand Up @@ -395,6 +432,40 @@ protected File initRecoveryDb(String dbName) {
return new File(_recoveryPath.toUri().getPath(), dbName);
}

public static File findRegisteredExecutorFile(
String[] recoveryPaths, String recoveryFileName) {
File newFile = null;
if (recoveryPaths != null) {
File latestFile = null;
for (String path : recoveryPaths) {
File registeredExecutorFile = new File(path, recoveryFileName);
if (registeredExecutorFile.exists()) {
if (registeredExecutorFile.canRead()
&& registeredExecutorFile.canWrite()) {
if (latestFile == null || registeredExecutorFile.lastModified() > latestFile.lastModified()) {
latestFile = registeredExecutorFile;
}
}
} else {
if (newFile == null) {
try {
JavaUtils.deleteRecursively(registeredExecutorFile);
newFile = registeredExecutorFile;
} catch (IOException e) {
logger.warn("Failed to delete old registered executor file " + registeredExecutorFile, e);
}
}
}
}
if (latestFile == null) {
return newFile;
}
return latestFile;
}
return null;
}


/**
* Simply encodes an application ID.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ object ShuffleTestAccessor {
Option(resolver.executors.get(id))
}

def getAllExecutorInfos(resolver: ExternalShuffleBlockResolver):
ConcurrentMap[ExternalShuffleBlockResolver.AppExecId, ExecutorShuffleInfo] = {
resolver.executors
}

def registeredExecutorFile(resolver: ExternalShuffleBlockResolver): File = {
resolver.registeredExecutorFile
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,4 +381,76 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s1.secretsFile should be (null)
}

test("shuffle service should be robust when disk error if multiple disk recovery enabled") {
yarnConfig.set(YarnShuffleService.REGISTERED_EXECUTORS_MULTIPLE_DIRECTORIES_RECOVERY, "true")
yarnConfig.set(YarnShuffleService.REGISTERED_EXECUTORS_FILE_CHECK_INTERVAL, "500")
s1 = new YarnShuffleService
val recoveryPath = new Path(recoveryLocalDir.toURI.getPath)
s1.setRecoveryPath(recoveryPath)
s1.init(yarnConfig)

val appId1 = ApplicationId.newInstance(0, 1);
val app1 = makeAppInfo("user1", appId1)
val appId2 = ApplicationId.newInstance(0, 2)
val app2 = makeAppInfo("user1", appId2)

val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/foo2", "/bar2"), 3, SORT_MANAGER)
val shuffleInfo3 = new ExecutorShuffleInfo(Array("/foo3", "/bar3"), 3, SORT_MANAGER)

val blockResolver = ShuffleTestAccessor.getBlockResolver(s1.blockHandler)
val execStateFile1 = ShuffleTestAccessor.registeredExecutorFile(blockResolver)
assert(execStateFile1.getParentFile === recoveryLocalDir)

s1.initializeApplication(app1)
blockResolver.registerExecutor(appId1.toString, "exec1", shuffleInfo1)
val getExecInfo1 = ShuffleTestAccessor.getExecutorInfo(appId1, "exec1", blockResolver)
getExecInfo1 should be (Some(shuffleInfo1))

// Simulate disk error
recoveryLocalDir.setWritable(false)
execStateFile1.setWritable(false)
Thread.sleep(2000)
try {
// Test whether the failover of registeredExecutorFile happen when encountered disk error
val execStateFile2 = ShuffleTestAccessor.registeredExecutorFile(blockResolver)
assert(execStateFile1 !== execStateFile2)
val getExecInfo2 = ShuffleTestAccessor.getExecutorInfo(appId1, "exec1", blockResolver)
getExecInfo2 should be (Some(shuffleInfo1))

// Assume restarts happen here.
// Test whether we can find the last available registeredExecutorFile if restarts
val recoveryDirs = Seq(recoveryPath.toUri.getPath).toArray ++
yarnConfig.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS)
val findResult = YarnShuffleService.findRegisteredExecutorFile(
recoveryDirs, YarnShuffleService.RECOVERY_FILE_NAME)
assert(findResult === execStateFile2)

// Test whether registeredExecutors reloaded from leveldb is the same with the one
// before restarts
val currentDB = ShuffleTestAccessor.shuffleServiceLevelDB(blockResolver)
val reloadExecutors1 = ShuffleTestAccessor.reloadRegisteredExecutors(currentDB)
reloadExecutors1 should be (ShuffleTestAccessor.getAllExecutorInfos(blockResolver))

blockResolver.registerExecutor(appId1.toString, "exec2", shuffleInfo2)
val getExecInfo3 = ShuffleTestAccessor.getExecutorInfo(appId1, "exec2", blockResolver)
getExecInfo3 should be(Some(shuffleInfo2))
val reloadExecutors2 = ShuffleTestAccessor.reloadRegisteredExecutors(currentDB)
reloadExecutors2 should be (ShuffleTestAccessor.getAllExecutorInfos(blockResolver))

s1.initializeApplication(app2)
blockResolver.registerExecutor(appId2.toString, "exec1", shuffleInfo3)
val getExecInfo4 = ShuffleTestAccessor.getExecutorInfo(appId2, "exec1", blockResolver)
getExecInfo4 should be(Some(shuffleInfo3))
val reloadExecutors3 = ShuffleTestAccessor.reloadRegisteredExecutors(currentDB)
reloadExecutors3 should be (ShuffleTestAccessor.getAllExecutorInfos(blockResolver))

} finally {
s1.stopApplication(new ApplicationTerminationContext(appId1))
s1.stopApplication(new ApplicationTerminationContext(appId2))
s1.stop()
recoveryLocalDir.setWritable(true)
execStateFile1.setWritable(true)
}
}
}