diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java index 7ddd20bde35c..9051ad1c7375 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java @@ -125,6 +125,10 @@ protected synchronized void setInterval(long newInterval, TimeUnit newUnit) { this.unit = newUnit; } + protected synchronized long getIntervalMillis() { + return this.unit.toMillis(interval); + } + public abstract BackgroundTaskQueue getTasks(); /** diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index bd043c7346a2..4ee29a5aee2d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -38,6 +38,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -152,7 +153,7 @@ public class DirectoryDeletingService extends AbstractKeyDeletingService { private int ratisByteLimit; private final SnapshotChainManager snapshotChainManager; private final boolean deepCleanSnapshots; - private final ExecutorService deletionThreadPool; + private ExecutorService deletionThreadPool; private final int numberOfParallelThreadsPerStore; private final AtomicLong deletedDirsCount; private final AtomicLong movedDirsCount; @@ -168,9 +169,8 @@ public DirectoryDeletingService(long interval, TimeUnit unit, OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, StorageUnit.BYTES); this.numberOfParallelThreadsPerStore = dirDeletingServiceCorePoolSize; - this.deletionThreadPool = new ThreadPoolExecutor(0, numberOfParallelThreadsPerStore, interval, unit, - new LinkedBlockingDeque<>(Integer.MAX_VALUE)); - + this.deletionThreadPool = new ThreadPoolExecutor(0, numberOfParallelThreadsPerStore, + interval, unit, new LinkedBlockingDeque<>(Integer.MAX_VALUE)); // always go to 90% of max limit for request as other header will be added this.ratisByteLimit = (int) (limit * 0.9); registerReconfigCallbacks(ozoneManager.getReconfigurationHandler()); @@ -226,9 +226,33 @@ public BackgroundTaskQueue getTasks() { @Override public void shutdown() { + if (deletionThreadPool != null) { + deletionThreadPool.shutdown(); + try { + if (!deletionThreadPool.awaitTermination(60, TimeUnit.SECONDS)) { + deletionThreadPool.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + deletionThreadPool.shutdownNow(); + } + } super.shutdown(); } + @Override + public synchronized void start() { + if (deletionThreadPool == null || deletionThreadPool.isShutdown() || deletionThreadPool.isTerminated()) { + this.deletionThreadPool = new ThreadPoolExecutor(0, numberOfParallelThreadsPerStore, + super.getIntervalMillis(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(Integer.MAX_VALUE)); + } + super.start(); + } + + private boolean isThreadPoolActive(ExecutorService threadPoolExecutor) { + return threadPoolExecutor != null && !threadPoolExecutor.isShutdown() && !threadPoolExecutor.isTerminated(); + } + @SuppressWarnings("checkstyle:ParameterNumber") void optimizeDirDeletesAndSubmitRequest( long dirNum, long subDirNum, long subFileNum, @@ -238,7 +262,7 @@ void optimizeDirDeletesAndSubmitRequest( long remainingBufLimit, KeyManager keyManager, CheckedFunction, Boolean, IOException> reclaimableDirChecker, CheckedFunction, Boolean, IOException> reclaimableFileChecker, - UUID expectedPreviousSnapshotId, long rnCnt) { + UUID expectedPreviousSnapshotId, long rnCnt) throws InterruptedException { // Optimization to handle delete sub-dir and keys to remove quickly // This case will be useful to handle when depth of directory is high @@ -435,7 +459,7 @@ private OzoneManagerProtocolProtos.PurgePathRequest wrapPurgeRequest( } private OzoneManagerProtocolProtos.OMResponse submitPurgePaths(List requests, - String snapTableKey, UUID expectedPreviousSnapshotId) { + String snapTableKey, UUID expectedPreviousSnapshotId) throws InterruptedException { OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest = OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder(); @@ -461,7 +485,7 @@ private OzoneManagerProtocolProtos.OMResponse submitPurgePaths(List a && b); } // If AOS or all directories have been processed for snapshot, update snapshot size delta and deep clean flag @@ -572,7 +599,7 @@ private void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyMan */ private boolean processDeletedDirectories(SnapshotInfo currentSnapshotInfo, KeyManager keyManager, DeletedDirSupplier dirSupplier, long remainingBufLimit, UUID expectedPreviousSnapshotId, - Map> totalExclusiveSizeMap, long runCount) { + Map> totalExclusiveSizeMap, long runCount) throws InterruptedException { OmSnapshotManager omSnapshotManager = getOzoneManager().getOmSnapshotManager(); IOzoneManagerLock lock = getOzoneManager().getMetadataManager().getLock(); String snapshotTableKey = currentSnapshotInfo == null ? null : currentSnapshotInfo.getTableKey(); @@ -676,9 +703,13 @@ public BackgroundTaskResult call() { : omSnapshot.get().getKeyManager(); processDeletedDirsForStore(snapInfo, keyManager, ratisByteLimit, run); } - } catch (IOException | ExecutionException | InterruptedException e) { + } catch (IOException | ExecutionException e) { LOG.error("Error while running delete files background task for store {}. Will retry at next run.", snapInfo, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Interruption running delete directory background task for store {}.", + snapInfo, e); } } // By design, no one cares about the results of this call back. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 15a620a8e492..c9c1237c27eb 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -127,7 +127,7 @@ public AtomicLong getDeletedKeyCount() { Pair processKeyDeletes(List keyBlocksList, Map keysToModify, List renameEntries, - String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException { + String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException, InterruptedException { long startTime = Time.monotonicNow(); Pair purgeResult = Pair.of(0, false); if (LOG.isDebugEnabled()) { @@ -166,7 +166,7 @@ Pair processKeyDeletes(List keyBlocksList, */ private Pair submitPurgeKeysRequest(List results, Map keysToModify, List renameEntriesToBeDeleted, - String snapTableKey, UUID expectedPreviousSnapshotId) { + String snapTableKey, UUID expectedPreviousSnapshotId) throws InterruptedException { List purgeKeys = new ArrayList<>(); // Put all keys to be purged in a list @@ -252,7 +252,7 @@ private Pair submitPurgeKeysRequest(List mocked = mockStatic(OzoneManagerRatisUtils.class, CALLS_REAL_METHODS)) {