diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index c3b7db15d21d9..3d18d20518410 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -457,18 +457,20 @@ static ConcurrentMap reloadRegisteredExecutors(D throws IOException { ConcurrentMap registeredExecutors = Maps.newConcurrentMap(); if (db != null) { - DBIterator itr = db.iterator(); - itr.seek(APP_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); - while (itr.hasNext()) { - Map.Entry e = itr.next(); - String key = new String(e.getKey(), StandardCharsets.UTF_8); - if (!key.startsWith(APP_KEY_PREFIX)) { - break; + try (DBIterator itr = db.iterator()) { + itr.seek(APP_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); + while (itr.hasNext()) { + Map.Entry e = itr.next(); + String key = new String(e.getKey(), StandardCharsets.UTF_8); + if (!key.startsWith(APP_KEY_PREFIX)) { + break; + } + AppExecId id = parseDbAppExecKey(key); + logger.info("Reloading registered executors: " + id.toString()); + ExecutorShuffleInfo shuffleInfo = + mapper.readValue(e.getValue(), ExecutorShuffleInfo.class); + registeredExecutors.put(id, shuffleInfo); } - AppExecId id = parseDbAppExecKey(key); - logger.info("Reloading registered executors: " + id.toString()); - ExecutorShuffleInfo shuffleInfo = mapper.readValue(e.getValue(), ExecutorShuffleInfo.class); - registeredExecutors.put(id, shuffleInfo); } } return registeredExecutors; diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 677adc76bffc5..b74830101ce6d 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -909,39 +909,40 @@ void reloadAndCleanUpAppShuffleInfo(DB db) throws IOException { List reloadActiveAppAttemptsPathInfo(DB db) throws IOException { List dbKeysToBeRemoved = new ArrayList<>(); if (db != null) { - DBIterator itr = db.iterator(); - itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); - while (itr.hasNext()) { - Map.Entry entry = itr.next(); - String key = new String(entry.getKey(), StandardCharsets.UTF_8); - if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) { - break; - } - AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key); - AppPathsInfo appPathsInfo = mapper.readValue(entry.getValue(), AppPathsInfo.class); - logger.debug("Reloading Application paths info for application {}", appAttemptId); - appsShuffleInfo.compute(appAttemptId.appId, - (appId, existingAppShuffleInfo) -> { - if (existingAppShuffleInfo == null || - existingAppShuffleInfo.attemptId < appAttemptId.attemptId) { - if (existingAppShuffleInfo != null) { - AppAttemptId existingAppAttemptId = new AppAttemptId( - existingAppShuffleInfo.appId, existingAppShuffleInfo.attemptId); - try { - // Add the former outdated DB key to deletion list - dbKeysToBeRemoved.add(getDbAppAttemptPathsKey(existingAppAttemptId)); - } catch (IOException e) { - logger.error("Failed to get the DB key for {}", existingAppAttemptId, e); + try (DBIterator itr = db.iterator()) { + itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); + while (itr.hasNext()) { + Map.Entry entry = itr.next(); + String key = new String(entry.getKey(), StandardCharsets.UTF_8); + if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) { + break; + } + AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key); + AppPathsInfo appPathsInfo = mapper.readValue(entry.getValue(), AppPathsInfo.class); + logger.debug("Reloading Application paths info for application {}", appAttemptId); + appsShuffleInfo.compute(appAttemptId.appId, + (appId, existingAppShuffleInfo) -> { + if (existingAppShuffleInfo == null || + existingAppShuffleInfo.attemptId < appAttemptId.attemptId) { + if (existingAppShuffleInfo != null) { + AppAttemptId existingAppAttemptId = new AppAttemptId( + existingAppShuffleInfo.appId, existingAppShuffleInfo.attemptId); + try { + // Add the former outdated DB key to deletion list + dbKeysToBeRemoved.add(getDbAppAttemptPathsKey(existingAppAttemptId)); + } catch (IOException e) { + logger.error("Failed to get the DB key for {}", existingAppAttemptId, e); + } } + return new AppShuffleInfo( + appAttemptId.appId, appAttemptId.attemptId, appPathsInfo); + } else { + // Add the current DB key to deletion list as it is outdated + dbKeysToBeRemoved.add(entry.getKey()); + return existingAppShuffleInfo; } - return new AppShuffleInfo( - appAttemptId.appId, appAttemptId.attemptId, appPathsInfo); - } else { - // Add the current DB key to deletion list as it is outdated - dbKeysToBeRemoved.add(entry.getKey()); - return existingAppShuffleInfo; - } - }); + }); + } } } return dbKeysToBeRemoved; @@ -954,41 +955,44 @@ List reloadActiveAppAttemptsPathInfo(DB db) throws IOException { List reloadFinalizedAppAttemptsShuffleMergeInfo(DB db) throws IOException { List dbKeysToBeRemoved = new ArrayList<>(); if (db != null) { - DBIterator itr = db.iterator(); - itr.seek(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); - while (itr.hasNext()) { - Map.Entry entry = itr.next(); - String key = new String(entry.getKey(), StandardCharsets.UTF_8); - if (!key.startsWith(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX)) { - break; - } - AppAttemptShuffleMergeId partitionId = parseDbAppAttemptShufflePartitionKey(key); - logger.debug("Reloading finalized shuffle info for partitionId {}", partitionId); - AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(partitionId.appId); - if (appShuffleInfo != null && appShuffleInfo.attemptId == partitionId.attemptId) { - appShuffleInfo.shuffles.compute(partitionId.shuffleId, - (shuffleId, existingMergePartitionInfo) -> { - if (existingMergePartitionInfo == null || - existingMergePartitionInfo.shuffleMergeId < partitionId.shuffleMergeId) { - if (existingMergePartitionInfo != null) { - AppAttemptShuffleMergeId appAttemptShuffleMergeId = - new AppAttemptShuffleMergeId(appShuffleInfo.appId, appShuffleInfo.attemptId, - shuffleId, existingMergePartitionInfo.shuffleMergeId); - try{ - dbKeysToBeRemoved.add( - getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId)); - } catch (Exception e) { - logger.error("Error getting the DB key for {}", appAttemptShuffleMergeId, e); + try (DBIterator itr = db.iterator()) { + itr.seek(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); + while (itr.hasNext()) { + Map.Entry entry = itr.next(); + String key = new String(entry.getKey(), StandardCharsets.UTF_8); + if (!key.startsWith(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX)) { + break; + } + AppAttemptShuffleMergeId partitionId = parseDbAppAttemptShufflePartitionKey(key); + logger.debug("Reloading finalized shuffle info for partitionId {}", partitionId); + AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(partitionId.appId); + if (appShuffleInfo != null && appShuffleInfo.attemptId == partitionId.attemptId) { + appShuffleInfo.shuffles.compute(partitionId.shuffleId, + (shuffleId, existingMergePartitionInfo) -> { + if (existingMergePartitionInfo == null || + existingMergePartitionInfo.shuffleMergeId < partitionId.shuffleMergeId) { + if (existingMergePartitionInfo != null) { + AppAttemptShuffleMergeId appAttemptShuffleMergeId = + new AppAttemptShuffleMergeId( + appShuffleInfo.appId, appShuffleInfo.attemptId, + shuffleId, existingMergePartitionInfo.shuffleMergeId); + try{ + dbKeysToBeRemoved.add( + getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId)); + } catch (Exception e) { + logger.error("Error getting the DB key for {}", + appAttemptShuffleMergeId, e); + } } + return new AppShuffleMergePartitionsInfo(partitionId.shuffleMergeId, true); + } else { + dbKeysToBeRemoved.add(entry.getKey()); + return existingMergePartitionInfo; } - return new AppShuffleMergePartitionsInfo(partitionId.shuffleMergeId, true); - } else { - dbKeysToBeRemoved.add(entry.getKey()); - return existingMergePartitionInfo; - } - }); - } else { - dbKeysToBeRemoved.add(entry.getKey()); + }); + } else { + dbKeysToBeRemoved.add(entry.getKey()); + } } } } diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 9295239f9964f..af3f9b112fb98 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -341,18 +341,19 @@ private void loadSecretsFromDb() throws IOException { logger.info("Recovery location is: " + secretsFile.getPath()); if (db != null) { logger.info("Going to reload spark shuffle data"); - DBIterator itr = db.iterator(); - itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); - while (itr.hasNext()) { - Map.Entry e = itr.next(); - String key = new String(e.getKey(), StandardCharsets.UTF_8); - if (!key.startsWith(APP_CREDS_KEY_PREFIX)) { - break; + try (DBIterator itr = db.iterator()) { + itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); + while (itr.hasNext()) { + Map.Entry e = itr.next(); + String key = new String(e.getKey(), StandardCharsets.UTF_8); + if (!key.startsWith(APP_CREDS_KEY_PREFIX)) { + break; + } + String id = parseDbAppKey(key); + ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class); + logger.info("Reloading tokens for app: " + id); + secretManager.registerApp(id, secret); } - String id = parseDbAppKey(key); - ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class); - logger.info("Reloading tokens for app: " + id); - secretManager.registerApp(id, secret); } } }