Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -457,18 +457,20 @@ static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(D
throws IOException {
ConcurrentMap<AppExecId, ExecutorShuffleInfo> registeredExecutors = Maps.newConcurrentMap();
if (db != null) {
DBIterator itr = db.iterator();
Copy link
Contributor Author

@LuciferYang LuciferYang Aug 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The methods reloadActiveAppAttemptsPathInfo and reloadFinalizedAppAttemptsShuffleMergeInfo should be Spark 3.4 only. Please let me know if this PR needs to be backport to other branch

itr.seek(APP_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> e = itr.next();
String key = new String(e.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_KEY_PREFIX)) {
break;
try (DBIterator itr = db.iterator()) {
itr.seek(APP_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> e = itr.next();
String key = new String(e.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_KEY_PREFIX)) {
break;
}
AppExecId id = parseDbAppExecKey(key);
logger.info("Reloading registered executors: " + id.toString());
ExecutorShuffleInfo shuffleInfo =
mapper.readValue(e.getValue(), ExecutorShuffleInfo.class);
registeredExecutors.put(id, shuffleInfo);
}
AppExecId id = parseDbAppExecKey(key);
logger.info("Reloading registered executors: " + id.toString());
ExecutorShuffleInfo shuffleInfo = mapper.readValue(e.getValue(), ExecutorShuffleInfo.class);
registeredExecutors.put(id, shuffleInfo);
}
}
return registeredExecutors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,39 +909,40 @@ void reloadAndCleanUpAppShuffleInfo(DB db) throws IOException {
List<byte[]> reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
List<byte[]> dbKeysToBeRemoved = new ArrayList<>();
if (db != null) {
DBIterator itr = db.iterator();
itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next();
String key = new String(entry.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
break;
}
AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
AppPathsInfo appPathsInfo = mapper.readValue(entry.getValue(), AppPathsInfo.class);
logger.debug("Reloading Application paths info for application {}", appAttemptId);
appsShuffleInfo.compute(appAttemptId.appId,
(appId, existingAppShuffleInfo) -> {
if (existingAppShuffleInfo == null ||
existingAppShuffleInfo.attemptId < appAttemptId.attemptId) {
if (existingAppShuffleInfo != null) {
AppAttemptId existingAppAttemptId = new AppAttemptId(
existingAppShuffleInfo.appId, existingAppShuffleInfo.attemptId);
try {
// Add the former outdated DB key to deletion list
dbKeysToBeRemoved.add(getDbAppAttemptPathsKey(existingAppAttemptId));
} catch (IOException e) {
logger.error("Failed to get the DB key for {}", existingAppAttemptId, e);
try (DBIterator itr = db.iterator()) {
itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next();
String key = new String(entry.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
break;
}
AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
AppPathsInfo appPathsInfo = mapper.readValue(entry.getValue(), AppPathsInfo.class);
logger.debug("Reloading Application paths info for application {}", appAttemptId);
appsShuffleInfo.compute(appAttemptId.appId,
(appId, existingAppShuffleInfo) -> {
if (existingAppShuffleInfo == null ||
existingAppShuffleInfo.attemptId < appAttemptId.attemptId) {
if (existingAppShuffleInfo != null) {
AppAttemptId existingAppAttemptId = new AppAttemptId(
existingAppShuffleInfo.appId, existingAppShuffleInfo.attemptId);
try {
// Add the former outdated DB key to deletion list
dbKeysToBeRemoved.add(getDbAppAttemptPathsKey(existingAppAttemptId));
} catch (IOException e) {
logger.error("Failed to get the DB key for {}", existingAppAttemptId, e);
}
}
return new AppShuffleInfo(
appAttemptId.appId, appAttemptId.attemptId, appPathsInfo);
} else {
// Add the current DB key to deletion list as it is outdated
dbKeysToBeRemoved.add(entry.getKey());
return existingAppShuffleInfo;
}
return new AppShuffleInfo(
appAttemptId.appId, appAttemptId.attemptId, appPathsInfo);
} else {
// Add the current DB key to deletion list as it is outdated
dbKeysToBeRemoved.add(entry.getKey());
return existingAppShuffleInfo;
}
});
});
}
}
}
return dbKeysToBeRemoved;
Expand All @@ -954,41 +955,44 @@ List<byte[]> reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
List<byte[]> reloadFinalizedAppAttemptsShuffleMergeInfo(DB db) throws IOException {
List<byte[]> dbKeysToBeRemoved = new ArrayList<>();
if (db != null) {
DBIterator itr = db.iterator();
itr.seek(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next();
String key = new String(entry.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX)) {
break;
}
AppAttemptShuffleMergeId partitionId = parseDbAppAttemptShufflePartitionKey(key);
logger.debug("Reloading finalized shuffle info for partitionId {}", partitionId);
AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(partitionId.appId);
if (appShuffleInfo != null && appShuffleInfo.attemptId == partitionId.attemptId) {
appShuffleInfo.shuffles.compute(partitionId.shuffleId,
(shuffleId, existingMergePartitionInfo) -> {
if (existingMergePartitionInfo == null ||
existingMergePartitionInfo.shuffleMergeId < partitionId.shuffleMergeId) {
if (existingMergePartitionInfo != null) {
AppAttemptShuffleMergeId appAttemptShuffleMergeId =
new AppAttemptShuffleMergeId(appShuffleInfo.appId, appShuffleInfo.attemptId,
shuffleId, existingMergePartitionInfo.shuffleMergeId);
try{
dbKeysToBeRemoved.add(
getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId));
} catch (Exception e) {
logger.error("Error getting the DB key for {}", appAttemptShuffleMergeId, e);
try (DBIterator itr = db.iterator()) {
itr.seek(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next();
String key = new String(entry.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX)) {
break;
}
AppAttemptShuffleMergeId partitionId = parseDbAppAttemptShufflePartitionKey(key);
logger.debug("Reloading finalized shuffle info for partitionId {}", partitionId);
AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(partitionId.appId);
if (appShuffleInfo != null && appShuffleInfo.attemptId == partitionId.attemptId) {
appShuffleInfo.shuffles.compute(partitionId.shuffleId,
(shuffleId, existingMergePartitionInfo) -> {
if (existingMergePartitionInfo == null ||
existingMergePartitionInfo.shuffleMergeId < partitionId.shuffleMergeId) {
if (existingMergePartitionInfo != null) {
AppAttemptShuffleMergeId appAttemptShuffleMergeId =
new AppAttemptShuffleMergeId(
appShuffleInfo.appId, appShuffleInfo.attemptId,
shuffleId, existingMergePartitionInfo.shuffleMergeId);
try{
dbKeysToBeRemoved.add(
getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId));
} catch (Exception e) {
logger.error("Error getting the DB key for {}",
appAttemptShuffleMergeId, e);
}
}
return new AppShuffleMergePartitionsInfo(partitionId.shuffleMergeId, true);
} else {
dbKeysToBeRemoved.add(entry.getKey());
return existingMergePartitionInfo;
}
return new AppShuffleMergePartitionsInfo(partitionId.shuffleMergeId, true);
} else {
dbKeysToBeRemoved.add(entry.getKey());
return existingMergePartitionInfo;
}
});
} else {
dbKeysToBeRemoved.add(entry.getKey());
});
} else {
dbKeysToBeRemoved.add(entry.getKey());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,18 +341,19 @@ private void loadSecretsFromDb() throws IOException {
logger.info("Recovery location is: " + secretsFile.getPath());
if (db != null) {
logger.info("Going to reload spark shuffle data");
DBIterator itr = db.iterator();
itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> e = itr.next();
String key = new String(e.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_CREDS_KEY_PREFIX)) {
break;
try (DBIterator itr = db.iterator()) {
itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> e = itr.next();
String key = new String(e.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_CREDS_KEY_PREFIX)) {
break;
}
String id = parseDbAppKey(key);
ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class);
logger.info("Reloading tokens for app: " + id);
secretManager.registerApp(id, secret);
}
String id = parseDbAppKey(key);
ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class);
logger.info("Reloading tokens for app: " + id);
secretManager.registerApp(id, secret);
}
}
}
Expand Down