Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
Expand All @@ -64,13 +66,15 @@
import org.apache.hadoop.hdds.recon.ReconConfig;
import org.apache.hadoop.hdds.utils.DBCheckpointServlet;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.ozone.compaction.log.CompactionLogEntry;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -249,20 +253,20 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina
if (shouldContinue) {
// we finished transferring files from snapshot DB's by now and
// this is the last step where we transfer the active om.db contents
checkpoint = createAndPrepareCheckpoint(tmpdir, true);
// get the list of sst files of the checkpoint.
checkpoint = createAndPrepareCheckpoint(true);
List<Path> sstBackupFiles = extractSSTFilesFromCompactionLog(checkpoint);
// unlimited files as we want the Active DB contents to be transferred in a single batch
maxTotalSstSize.set(Long.MAX_VALUE);
Path checkpointDir = checkpoint.getCheckpointLocation();
Map<String, String> hardLinkFileMap = new HashMap<>();
writeDBToArchive(sstFilesToExclude, checkpointDir,
maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, false);
if (includeSnapshotData) {
Path tmpCompactionLogDir = tmpdir.resolve(getCompactionLogDir().getFileName());
Path tmpSstBackupDir = tmpdir.resolve(getSstBackupDir().getFileName());
writeDBToArchive(sstFilesToExclude, tmpCompactionLogDir, maxTotalSstSize, archiveOutputStream, tmpdir,
hardLinkFileMap, getCompactionLogDir(), false);
writeDBToArchive(sstFilesToExclude, tmpSstBackupDir, maxTotalSstSize, archiveOutputStream, tmpdir,
hardLinkFileMap, getSstBackupDir(), false);
writeDBToArchive(sstFilesToExclude, getCompactionLogDir(), maxTotalSstSize, archiveOutputStream, tmpdir,
hardLinkFileMap, false);
writeDBToArchive(sstFilesToExclude, sstBackupFiles.stream(),
maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, false);
// This is done to ensure all data to be copied correctly is flushed in the snapshot DB
transferSnapshotData(sstFilesToExclude, tmpdir, snapshotPaths, maxTotalSstSize,
archiveOutputStream, hardLinkFileMap);
Expand Down Expand Up @@ -317,14 +321,6 @@ private void transferSnapshotData(Set<String> sstFilesToExclude, Path tmpdir, Se
}
}

@VisibleForTesting
boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dir,
AtomicLong maxTotalSstSize, ArchiveOutputStream<TarArchiveEntry> archiveOutputStream,
Path tmpdir, Map<String, String> hardLinkFileMap, boolean onlySstFile) throws IOException {
return writeDBToArchive(sstFilesToExclude, dir, maxTotalSstSize,
archiveOutputStream, tmpdir, hardLinkFileMap, null, onlySstFile);
}

private static void cleanupCheckpoint(DBCheckpoint checkpoint) {
if (checkpoint != null) {
try {
Expand Down Expand Up @@ -402,18 +398,30 @@ Set<Path> getSnapshotDirs(OMMetadataManager omMetadataManager) throws IOExceptio
return snapshotPaths;
}

@VisibleForTesting
boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir, AtomicLong maxTotalSstSize,
ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, Path tmpDir,
Map<String, String> hardLinkFileMap, boolean onlySstFile) throws IOException {
if (!Files.exists(dbDir)) {
LOG.warn("DB directory {} does not exist. Skipping.", dbDir);
return true;
}
Stream<Path> files = Files.list(dbDir);
return writeDBToArchive(sstFilesToExclude, files,
maxTotalSstSize, archiveOutputStream, tmpDir, hardLinkFileMap, onlySstFile);
}

/**
* Writes database files to the archive, handling deduplication based on inode IDs.
* Here the dbDir could either be a snapshot db directory, the active om.db,
* compaction log dir, sst backup dir.
*
* @param sstFilesToExclude Set of SST file IDs to exclude from the archive
* @param dbDir Directory containing database files to archive
* @param files Stream of files to archive
* @param maxTotalSstSize Maximum total size of SST files to include
* @param archiveOutputStream Archive output stream
* @param tmpDir Temporary directory for processing
* @param hardLinkFileMap Map of hardlink file paths to their unique identifiers for deduplication
* @param destDir Destination directory for the archived files. If null,
* the archived files are not moved to this directory.
* @param onlySstFile If true, only SST files are processed. If false, all files are processed.
* <p>
Expand All @@ -424,49 +432,40 @@ Set<Path> getSnapshotDirs(OMMetadataManager omMetadataManager) throws IOExceptio
* @throws IOException if an I/O error occurs
*/
@SuppressWarnings("checkstyle:ParameterNumber")
private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir, AtomicLong maxTotalSstSize,
private boolean writeDBToArchive(Set<String> sstFilesToExclude, Stream<Path> files, AtomicLong maxTotalSstSize,
ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, Path tmpDir,
Map<String, String> hardLinkFileMap, Path destDir, boolean onlySstFile) throws IOException {
if (!Files.exists(dbDir)) {
LOG.warn("DB directory {} does not exist. Skipping.", dbDir);
return true;
}
Map<String, String> hardLinkFileMap, boolean onlySstFile) throws IOException {
long bytesWritten = 0L;
int filesWritten = 0;
long lastLoggedTime = Time.monotonicNow();
try (Stream<Path> files = Files.list(dbDir)) {
Iterable<Path> iterable = files::iterator;
for (Path dbFile : iterable) {
if (!Files.isDirectory(dbFile)) {
if (onlySstFile && !dbFile.toString().endsWith(ROCKSDB_SST_SUFFIX)) {
continue;
Iterable<Path> iterable = files::iterator;
for (Path dbFile : iterable) {
if (!Files.isDirectory(dbFile)) {
if (onlySstFile && !dbFile.toString().endsWith(ROCKSDB_SST_SUFFIX)) {
continue;
}
String fileId = OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(dbFile);
if (hardLinkFileMap != null) {
String path = dbFile.toFile().getAbsolutePath();
// if the file is in the om checkpoint dir, then we need to change the path to point to the OM DB.
if (path.contains(OM_CHECKPOINT_DIR)) {
path = getDbStore().getDbLocation().toPath().resolve(dbFile.getFileName()).toAbsolutePath().toString();
}
String fileId = OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(dbFile);
if (hardLinkFileMap != null) {
String path = dbFile.toFile().getAbsolutePath();
if (destDir != null) {
path = destDir.resolve(dbFile.getFileName()).toString();
}
// if the file is in the om checkpoint dir, then we need to change the path to point to the OM DB.
if (path.contains(OM_CHECKPOINT_DIR)) {
path = getDbStore().getDbLocation().toPath().resolve(dbFile.getFileName()).toAbsolutePath().toString();
}
hardLinkFileMap.put(path, fileId);
hardLinkFileMap.put(path, fileId);
}
if (!sstFilesToExclude.contains(fileId)) {
long fileSize = Files.size(dbFile);
if (maxTotalSstSize.get() - fileSize <= 0) {
return false;
}
if (!sstFilesToExclude.contains(fileId)) {
long fileSize = Files.size(dbFile);
if (maxTotalSstSize.get() - fileSize <= 0) {
return false;
}
bytesWritten += linkAndIncludeFile(dbFile.toFile(), fileId, archiveOutputStream, tmpDir);
filesWritten++;
maxTotalSstSize.addAndGet(-fileSize);
sstFilesToExclude.add(fileId);
if (Time.monotonicNow() - lastLoggedTime >= 30000) {
LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...",
bytesWritten / (1024), filesWritten);
lastLoggedTime = Time.monotonicNow();
}
bytesWritten += linkAndIncludeFile(dbFile.toFile(), fileId, archiveOutputStream, tmpDir);
filesWritten++;
maxTotalSstSize.addAndGet(-fileSize);
sstFilesToExclude.add(fileId);
if (Time.monotonicNow() - lastLoggedTime >= 30000) {
LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...",
bytesWritten / (1024), filesWritten);
lastLoggedTime = Time.monotonicNow();
}
}
}
Expand All @@ -480,21 +479,33 @@ private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir, Atom
* The copy to the temporary directory for compaction log and SST backup files
* is done to maintain a consistent view of the files in these directories.
*
* @param tmpdir Temporary directory for storing checkpoint-related files.
* @param flush If true, flushes in-memory data to disk before checkpointing.
* @return The created database checkpoint.
* @throws IOException If an error occurs during checkpoint creation or file copying.
*/
private DBCheckpoint createAndPrepareCheckpoint(Path tmpdir, boolean flush) throws IOException {
// make tmp directories to contain the copies
Path tmpCompactionLogDir = tmpdir.resolve(getCompactionLogDir().getFileName());
Path tmpSstBackupDir = tmpdir.resolve(getSstBackupDir().getFileName());
private DBCheckpoint createAndPrepareCheckpoint(boolean flush) throws IOException {
// Create & return the checkpoint.
return getDbStore().getCheckpoint(flush);
}

private List<Path> extractSSTFilesFromCompactionLog(DBCheckpoint dbCheckpoint) throws IOException {
List<Path> sstFiles = new ArrayList<>();
try (OmMetadataManagerImpl checkpointMetadataManager =
OmMetadataManagerImpl.createCheckpointMetadataManager(getConf(), dbCheckpoint)) {
try (Table.KeyValueIterator<String, CompactionLogEntry>
iterator = checkpointMetadataManager.getCompactionLogTable().iterator()) {
iterator.seekToFirst();

// Create checkpoint and then copy the files so that it has all the compaction entries and files.
DBCheckpoint dbCheckpoint = getDbStore().getCheckpoint(flush);
FileUtils.copyDirectory(getCompactionLogDir().toFile(), tmpCompactionLogDir.toFile());
OmSnapshotUtils.linkFiles(getSstBackupDir().toFile(), tmpSstBackupDir.toFile());
Path sstBackupDir = getSstBackupDir();

return dbCheckpoint;
while (iterator.hasNext()) {
CompactionLogEntry logEntry = iterator.next().getValue();
logEntry.getInputFileInfoList().forEach(f ->
sstFiles.add(sstBackupDir.resolve(f.getFileName() + ROCKSDB_SST_SUFFIX)));
}
}
} catch (Exception e) {
Copy link

Copilot AI Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching generic Exception is too broad. Catch specific exceptions like RocksDBException or IOException to handle different failure scenarios appropriately and avoid masking unexpected errors.

Copilot uses AI. Check for mistakes.
throw new IOException("Error reading compaction log from checkpoint", e);
}
return sstFiles;
}
}