diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java index 5d04f2060f59..8d2c5881e8f0 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java @@ -620,18 +620,20 @@ public static void writeDBCheckpointToStream( } } - public static void includeFile(File file, String entryName, + public static long includeFile(File file, String entryName, ArchiveOutputStream archiveOutputStream) throws IOException { ArchiveEntry archiveEntry = archiveOutputStream.createArchiveEntry(file, entryName); archiveOutputStream.putArchiveEntry(archiveEntry); + long bytesWritten; try (InputStream fis = Files.newInputStream(file.toPath())) { - IOUtils.copy(fis, archiveOutputStream); + bytesWritten = IOUtils.copy(fis, archiveOutputStream); archiveOutputStream.flush(); } finally { archiveOutputStream.closeArchiveEntry(); } + return bytesWritten; } // Mark tarball completed. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java index 7c02df4bebff..e7df608aed1e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.om; +import static org.apache.commons.io.filefilter.TrueFileFilter.TRUE; import static org.apache.hadoop.hdds.utils.HddsServerUtil.includeFile; import static org.apache.hadoop.hdds.utils.HddsServerUtil.includeRatisSnapshotCompleteFlag; import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR; @@ -56,6 +57,11 @@ import org.apache.commons.compress.archivers.ArchiveOutputStream; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOCase; +import org.apache.commons.io.file.Counters; +import org.apache.commons.io.file.CountingPathVisitor; +import org.apache.commons.io.file.PathFilter; +import org.apache.commons.io.filefilter.SuffixFileFilter; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.recon.ReconConfig; import org.apache.hadoop.hdds.utils.DBCheckpointServlet; @@ -69,6 +75,7 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,6 +100,8 @@ public class OMDBCheckpointServlet extends DBCheckpointServlet { private static final long serialVersionUID = 1L; private transient BootstrapStateHandler.Lock lock; private long maxTotalSstSize = 0; + private static final PathFilter SST_FILE_FILTER = + new SuffixFileFilter(ROCKSDB_SST_SUFFIX, IOCase.INSENSITIVE); @Override public void init() throws ServletException { @@ -167,6 +176,7 @@ public void writeDbDataToStream(DBCheckpoint checkpoint, // Files to be excluded from tarball Map> sstFilesToExclude = normalizeExcludeList(toExcludeList, checkpoint.getCheckpointLocation(), sstBackupDir); + boolean completed = getFilesForArchive(checkpoint, copyFiles, hardLinkFiles, sstFilesToExclude, includeSnapshotData(request), excludedList, sstBackupDir, compactionLogDir); @@ -270,13 +280,13 @@ public File getTmpDir() { @SuppressWarnings("checkstyle:ParameterNumber") private boolean getFilesForArchive(DBCheckpoint checkpoint, - Map> copyFiles, - Map hardLinkFiles, - Map> sstFilesToExclude, - boolean includeSnapshotData, - List excluded, - DirectoryData sstBackupDir, - DirectoryData compactionLogDir) + Map> copyFiles, + Map hardLinkFiles, + Map> sstFilesToExclude, + boolean includeSnapshotData, + List excluded, + DirectoryData sstBackupDir, + DirectoryData compactionLogDir) throws IOException { maxTotalSstSize = getConf().getLong( @@ -290,6 +300,12 @@ private boolean getFilesForArchive(DBCheckpoint checkpoint, } AtomicLong copySize = new AtomicLong(0L); + + // Log estimated total data transferred on first request. + if (sstFilesToExclude.isEmpty()) { + logEstimatedTarballSize(checkpoint, includeSnapshotData); + } + // Get the active fs files. Path dir = checkpoint.getCheckpointLocation(); if (!processDir(dir, copyFiles, hardLinkFiles, sstFilesToExclude, @@ -302,7 +318,7 @@ private boolean getFilesForArchive(DBCheckpoint checkpoint, } // Get the snapshot files. - Set snapshotPaths = waitForSnapshotDirs(checkpoint); + Set snapshotPaths = getSnapshotDirs(checkpoint, true); Path snapshotDir = getSnapshotDir(); if (!processDir(snapshotDir, copyFiles, hardLinkFiles, sstFilesToExclude, snapshotPaths, excluded, copySize, null)) { @@ -320,16 +336,40 @@ private boolean getFilesForArchive(DBCheckpoint checkpoint, hardLinkFiles, sstFilesToExclude, new HashSet<>(), excluded, copySize, compactionLogDir.getOriginalDir().toPath()); + } + private void logEstimatedTarballSize( + DBCheckpoint checkpoint, boolean includeSnapshotData) { + try { + Counters.PathCounters counters = Counters.longPathCounters(); + CountingPathVisitor visitor = new CountingPathVisitor( + counters, SST_FILE_FILTER, TRUE); + Files.walkFileTree(checkpoint.getCheckpointLocation(), visitor); + int totalSnapshots = 0; + if (includeSnapshotData) { + Set snapshotPaths = getSnapshotDirs(checkpoint, false); + totalSnapshots = snapshotPaths.size(); + for (Path snapshotDir: snapshotPaths) { + Files.walkFileTree(snapshotDir, visitor); + } + } + LOG.info("Estimates for Checkpoint Tarball Stream - Data size: {} KB, " + "SST files: {}{}", + counters.getByteCounter().get() / (1024), + counters.getFileCounter().get(), + (includeSnapshotData ? ", snapshots: " + totalSnapshots : "")); + } catch (Exception e) { + LOG.error("Could not estimate size of transfer to Checkpoint Tarball Stream.", e); + } } /** * The snapshotInfo table may contain a snapshot that * doesn't yet exist on the fs, so wait a few seconds for it. * @param checkpoint Checkpoint containing snapshot entries expected. + * @param waitForDir Wait for dir to exist on fs. * @return Set of expected snapshot dirs. */ - private Set waitForSnapshotDirs(DBCheckpoint checkpoint) + private Set getSnapshotDirs(DBCheckpoint checkpoint, boolean waitForDir) throws IOException { OzoneConfiguration conf = getConf(); @@ -348,7 +388,9 @@ private Set waitForSnapshotDirs(DBCheckpoint checkpoint) while (iterator.hasNext()) { Table.KeyValue entry = iterator.next(); Path path = Paths.get(getSnapshotPath(conf, entry.getValue())); - waitForDirToExist(path); + if (waitForDir) { + waitForDirToExist(path); + } snapshotPaths.add(path); } } finally { @@ -552,17 +594,21 @@ private void writeFilesToArchive( e.getKey().getFileName().toString().toLowerCase().endsWith(".sst")). collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + long bytesWritten = 0L; + int filesWritten = 0; + long lastLoggedTime = Time.monotonicNow(); + // Go through each of the files to be copied and add to archive. for (Map.Entry entry : filteredCopyFiles.entrySet()) { - Path file = entry.getValue(); + Path path = entry.getValue(); // Confirm the data is in the right place. - if (!file.toString().startsWith(metaDirPath.toString())) { + if (!path.toString().startsWith(metaDirPath.toString())) { throw new IOException("tarball file not in metadata dir: " - + file + ": " + metaDirPath); + + path + ": " + metaDirPath); } - String fixedFile = truncateFileName(truncateLength, file); + String fixedFile = truncateFileName(truncateLength, path); if (fixedFile.startsWith(OM_CHECKPOINT_DIR)) { // checkpoint files go to root of tarball Path f = Paths.get(fixedFile).getFileName(); @@ -570,7 +616,17 @@ private void writeFilesToArchive( fixedFile = f.toString(); } } - includeFile(entry.getKey().toFile(), fixedFile, archiveOutputStream); + File file = entry.getKey().toFile(); + if (!file.isDirectory()) { + filesWritten++; + } + bytesWritten += includeFile(file, fixedFile, archiveOutputStream); + // Log progress every 30 seconds + if (Time.monotonicNow() - lastLoggedTime >= 30000) { + LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...", + bytesWritten / (1024), filesWritten); + lastLoggedTime = Time.monotonicNow(); + } } if (completed) { @@ -595,6 +651,10 @@ private void writeFilesToArchive( // Mark tarball completed. includeRatisSnapshotCompleteFlag(archiveOutputStream); } + LOG.info("Completed transfer of {} KB, #files {} " + + "to checkpoint tarball stream.{}", + bytesWritten / (1024), filesWritten, (completed) ? + " Checkpoint tarball is complete." : ""); } @Nonnull