diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java index b2ea9cd33a0b..d0040e29e7e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java @@ -23,8 +23,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -437,19 +441,13 @@ private static List resolveAndArchive(FileSystem fs, Path baseArchiveDir, LOG.trace("Created archive directory {}", baseArchiveDir); } - List failures = new ArrayList<>(); + List failures = Collections.synchronizedList(new ArrayList<>()); String startTime = Long.toString(start); + List filesOnly = new ArrayList<>(); for (File file : toArchive) { // if its a file archive it try { - LOG.trace("Archiving {}", file); - if (file.isFile()) { - // attempt to archive the file - if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) { - LOG.warn("Couldn't archive " + file + " into backup directory: " + baseArchiveDir); - failures.add(file); - } - } else { + if (!file.isFile()) { // otherwise its a directory and we need to archive all files LOG.trace("{} is a directory, archiving children files", file); // so we add the directory name to the one base archive @@ -458,12 +456,51 @@ private static List resolveAndArchive(FileSystem fs, Path baseArchiveDir, // archive those too Collection children = file.getChildren(); failures.addAll(resolveAndArchive(fs, parentArchiveDir, children, start)); + } else { + filesOnly.add(file); } } catch (IOException e) { LOG.warn("Failed to archive {}", file, e); failures.add(file); } } + ExecutorService executorService = Executors.newFixedThreadPool(25); + Map> futures = new HashMap<>(); + // In current baseDir all files will be process concurrently + for (File file : filesOnly) { + LOG.trace("Archiving {}", file); + Future archiveTask = + executorService.submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime)); + futures.put(file, archiveTask); + } + + executorService.shutdown(); + try { + if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + executorService.shutdown(); + } + } catch (InterruptedException e) { + LOG.warn("HFileArchive Cleanup thread was interrupted while shutting down"); + } + + for (Map.Entry> fileFutureEntry : futures.entrySet()) { + try { + boolean fileCleaned = fileFutureEntry.getValue().get(); + if (!fileCleaned) { + LOG.warn("Couldn't archive %s into backup directory: %s" + .formatted(fileFutureEntry.getKey(), baseArchiveDir)); + failures.add(fileFutureEntry.getKey()); + } + } catch (InterruptedException e) { + LOG.warn("HFileArchive Cleanup thread was interrupted"); + } catch (ExecutionException e) { + // this is IOException + LOG.warn("Failed to archive {}", fileFutureEntry.getKey(), e); + failures.add(fileFutureEntry.getKey()); + } + + } + return failures; }