diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java index f967e30ec52f..27e7f1c2d6d6 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java @@ -44,11 +44,13 @@ import java.nio.file.StandardOpenOption; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -64,6 +66,7 @@ import org.apache.hadoop.hdds.recon.ReconConfig; import org.apache.hadoop.hdds.utils.DBCheckpointServlet; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; @@ -71,6 +74,7 @@ import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; +import org.apache.ozone.compaction.log.CompactionLogEntry; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -249,7 +253,9 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina if (shouldContinue) { // we finished transferring files from snapshot DB's by now and // this is the last step where we transfer the active om.db contents - checkpoint = createAndPrepareCheckpoint(tmpdir, true); + // get the list of sst files of the checkpoint. + checkpoint = createAndPrepareCheckpoint(true); + List sstBackupFiles = extractSSTFilesFromCompactionLog(checkpoint); // unlimited files as we want the Active DB contents to be transferred in a single batch maxTotalSstSize.set(Long.MAX_VALUE); Path checkpointDir = checkpoint.getCheckpointLocation(); @@ -257,12 +263,10 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina writeDBToArchive(sstFilesToExclude, checkpointDir, maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, false); if (includeSnapshotData) { - Path tmpCompactionLogDir = tmpdir.resolve(getCompactionLogDir().getFileName()); - Path tmpSstBackupDir = tmpdir.resolve(getSstBackupDir().getFileName()); - writeDBToArchive(sstFilesToExclude, tmpCompactionLogDir, maxTotalSstSize, archiveOutputStream, tmpdir, - hardLinkFileMap, getCompactionLogDir(), false); - writeDBToArchive(sstFilesToExclude, tmpSstBackupDir, maxTotalSstSize, archiveOutputStream, tmpdir, - hardLinkFileMap, getSstBackupDir(), false); + writeDBToArchive(sstFilesToExclude, getCompactionLogDir(), maxTotalSstSize, archiveOutputStream, tmpdir, + hardLinkFileMap, false); + writeDBToArchive(sstFilesToExclude, sstBackupFiles.stream(), + maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, false); // This is done to ensure all data to be copied correctly is flushed in the snapshot DB transferSnapshotData(sstFilesToExclude, tmpdir, snapshotPaths, maxTotalSstSize, archiveOutputStream, hardLinkFileMap); @@ -317,14 +321,6 @@ private void transferSnapshotData(Set sstFilesToExclude, Path tmpdir, Se } } - @VisibleForTesting - boolean writeDBToArchive(Set sstFilesToExclude, Path dir, - AtomicLong maxTotalSstSize, ArchiveOutputStream archiveOutputStream, - Path tmpdir, Map hardLinkFileMap, boolean onlySstFile) throws IOException { - return writeDBToArchive(sstFilesToExclude, dir, maxTotalSstSize, - archiveOutputStream, tmpdir, hardLinkFileMap, null, onlySstFile); - } - private static void cleanupCheckpoint(DBCheckpoint checkpoint) { if (checkpoint != null) { try { @@ -402,18 +398,30 @@ Set getSnapshotDirs(OMMetadataManager omMetadataManager) throws IOExceptio return snapshotPaths; } + @VisibleForTesting + boolean writeDBToArchive(Set sstFilesToExclude, Path dbDir, AtomicLong maxTotalSstSize, + ArchiveOutputStream archiveOutputStream, Path tmpDir, + Map hardLinkFileMap, boolean onlySstFile) throws IOException { + if (!Files.exists(dbDir)) { + LOG.warn("DB directory {} does not exist. Skipping.", dbDir); + return true; + } + Stream files = Files.list(dbDir); + return writeDBToArchive(sstFilesToExclude, files, + maxTotalSstSize, archiveOutputStream, tmpDir, hardLinkFileMap, onlySstFile); + } + /** * Writes database files to the archive, handling deduplication based on inode IDs. * Here the dbDir could either be a snapshot db directory, the active om.db, * compaction log dir, sst backup dir. * * @param sstFilesToExclude Set of SST file IDs to exclude from the archive - * @param dbDir Directory containing database files to archive + * @param files Stream of files to archive * @param maxTotalSstSize Maximum total size of SST files to include * @param archiveOutputStream Archive output stream * @param tmpDir Temporary directory for processing * @param hardLinkFileMap Map of hardlink file paths to their unique identifiers for deduplication - * @param destDir Destination directory for the archived files. If null, * the archived files are not moved to this directory. * @param onlySstFile If true, only SST files are processed. If false, all files are processed. *

@@ -424,49 +432,40 @@ Set getSnapshotDirs(OMMetadataManager omMetadataManager) throws IOExceptio * @throws IOException if an I/O error occurs */ @SuppressWarnings("checkstyle:ParameterNumber") - private boolean writeDBToArchive(Set sstFilesToExclude, Path dbDir, AtomicLong maxTotalSstSize, + private boolean writeDBToArchive(Set sstFilesToExclude, Stream files, AtomicLong maxTotalSstSize, ArchiveOutputStream archiveOutputStream, Path tmpDir, - Map hardLinkFileMap, Path destDir, boolean onlySstFile) throws IOException { - if (!Files.exists(dbDir)) { - LOG.warn("DB directory {} does not exist. Skipping.", dbDir); - return true; - } + Map hardLinkFileMap, boolean onlySstFile) throws IOException { long bytesWritten = 0L; int filesWritten = 0; long lastLoggedTime = Time.monotonicNow(); - try (Stream files = Files.list(dbDir)) { - Iterable iterable = files::iterator; - for (Path dbFile : iterable) { - if (!Files.isDirectory(dbFile)) { - if (onlySstFile && !dbFile.toString().endsWith(ROCKSDB_SST_SUFFIX)) { - continue; + Iterable iterable = files::iterator; + for (Path dbFile : iterable) { + if (!Files.isDirectory(dbFile)) { + if (onlySstFile && !dbFile.toString().endsWith(ROCKSDB_SST_SUFFIX)) { + continue; + } + String fileId = OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(dbFile); + if (hardLinkFileMap != null) { + String path = dbFile.toFile().getAbsolutePath(); + // if the file is in the om checkpoint dir, then we need to change the path to point to the OM DB. + if (path.contains(OM_CHECKPOINT_DIR)) { + path = getDbStore().getDbLocation().toPath().resolve(dbFile.getFileName()).toAbsolutePath().toString(); } - String fileId = OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(dbFile); - if (hardLinkFileMap != null) { - String path = dbFile.toFile().getAbsolutePath(); - if (destDir != null) { - path = destDir.resolve(dbFile.getFileName()).toString(); - } - // if the file is in the om checkpoint dir, then we need to change the path to point to the OM DB. - if (path.contains(OM_CHECKPOINT_DIR)) { - path = getDbStore().getDbLocation().toPath().resolve(dbFile.getFileName()).toAbsolutePath().toString(); - } - hardLinkFileMap.put(path, fileId); + hardLinkFileMap.put(path, fileId); + } + if (!sstFilesToExclude.contains(fileId)) { + long fileSize = Files.size(dbFile); + if (maxTotalSstSize.get() - fileSize <= 0) { + return false; } - if (!sstFilesToExclude.contains(fileId)) { - long fileSize = Files.size(dbFile); - if (maxTotalSstSize.get() - fileSize <= 0) { - return false; - } - bytesWritten += linkAndIncludeFile(dbFile.toFile(), fileId, archiveOutputStream, tmpDir); - filesWritten++; - maxTotalSstSize.addAndGet(-fileSize); - sstFilesToExclude.add(fileId); - if (Time.monotonicNow() - lastLoggedTime >= 30000) { - LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...", - bytesWritten / (1024), filesWritten); - lastLoggedTime = Time.monotonicNow(); - } + bytesWritten += linkAndIncludeFile(dbFile.toFile(), fileId, archiveOutputStream, tmpDir); + filesWritten++; + maxTotalSstSize.addAndGet(-fileSize); + sstFilesToExclude.add(fileId); + if (Time.monotonicNow() - lastLoggedTime >= 30000) { + LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...", + bytesWritten / (1024), filesWritten); + lastLoggedTime = Time.monotonicNow(); } } } @@ -480,21 +479,33 @@ private boolean writeDBToArchive(Set sstFilesToExclude, Path dbDir, Atom * The copy to the temporary directory for compaction log and SST backup files * is done to maintain a consistent view of the files in these directories. * - * @param tmpdir Temporary directory for storing checkpoint-related files. * @param flush If true, flushes in-memory data to disk before checkpointing. - * @return The created database checkpoint. * @throws IOException If an error occurs during checkpoint creation or file copying. */ - private DBCheckpoint createAndPrepareCheckpoint(Path tmpdir, boolean flush) throws IOException { - // make tmp directories to contain the copies - Path tmpCompactionLogDir = tmpdir.resolve(getCompactionLogDir().getFileName()); - Path tmpSstBackupDir = tmpdir.resolve(getSstBackupDir().getFileName()); + private DBCheckpoint createAndPrepareCheckpoint(boolean flush) throws IOException { + // Create & return the checkpoint. + return getDbStore().getCheckpoint(flush); + } + + private List extractSSTFilesFromCompactionLog(DBCheckpoint dbCheckpoint) throws IOException { + List sstFiles = new ArrayList<>(); + try (OmMetadataManagerImpl checkpointMetadataManager = + OmMetadataManagerImpl.createCheckpointMetadataManager(getConf(), dbCheckpoint)) { + try (Table.KeyValueIterator + iterator = checkpointMetadataManager.getCompactionLogTable().iterator()) { + iterator.seekToFirst(); - // Create checkpoint and then copy the files so that it has all the compaction entries and files. - DBCheckpoint dbCheckpoint = getDbStore().getCheckpoint(flush); - FileUtils.copyDirectory(getCompactionLogDir().toFile(), tmpCompactionLogDir.toFile()); - OmSnapshotUtils.linkFiles(getSstBackupDir().toFile(), tmpSstBackupDir.toFile()); + Path sstBackupDir = getSstBackupDir(); - return dbCheckpoint; + while (iterator.hasNext()) { + CompactionLogEntry logEntry = iterator.next().getValue(); + logEntry.getInputFileInfoList().forEach(f -> + sstFiles.add(sstBackupDir.resolve(f.getFileName() + ROCKSDB_SST_SUFFIX))); + } + } + } catch (Exception e) { + throw new IOException("Error reading compaction log from checkpoint", e); + } + return sstFiles; } }