Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
Expand All @@ -64,13 +66,15 @@
import org.apache.hadoop.hdds.recon.ReconConfig;
import org.apache.hadoop.hdds.utils.DBCheckpointServlet;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.ozone.compaction.log.CompactionLogEntry;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -251,12 +255,12 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina
if (shouldContinue) {
// we finished transferring files from snapshot DB's by now and
// this is the last step where we transfer the active om.db contents
checkpoint = createAndPrepareCheckpoint(tmpdir, true);
List<Path> sstFiles = new ArrayList<>();
checkpoint = createAndPrepareCheckpoint(tmpdir, true, sstFiles);
// unlimited files as we want the Active DB contents to be transferred in a single batch
maxTotalSstSize.set(Long.MAX_VALUE);
Path checkpointDir = checkpoint.getCheckpointLocation();
writeDBToArchive(sstFilesToExclude, checkpointDir,
Comment thread
jojochuang marked this conversation as resolved.
maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, false);
writeDBToArchive(sstFilesToExclude, sstFiles.stream(),
maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, null, false);
if (includeSnapshotData) {
Path tmpCompactionLogDir = tmpdir.resolve(getCompactionLogDir().getFileName());
Comment thread
jojochuang marked this conversation as resolved.
Outdated
Path tmpSstBackupDir = tmpdir.resolve(getSstBackupDir().getFileName());
Expand Down Expand Up @@ -403,13 +407,25 @@ Set<Path> getSnapshotDirs(OMMetadataManager omMetadataManager) throws IOExceptio
return snapshotPaths;
}

private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir, AtomicLong maxTotalSstSize,
ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, Path tmpDir,
Map<String, String> hardLinkFileMap, Path destDir, boolean onlySstFile) throws IOException {
if (!Files.exists(dbDir)) {
LOG.warn("DB directory {} does not exist. Skipping.", dbDir);
return true;
}
Stream<Path> files = Files.list(dbDir);
return writeDBToArchive(sstFilesToExclude, files,
maxTotalSstSize, archiveOutputStream, tmpDir, hardLinkFileMap, destDir, onlySstFile);
}

/**
* Writes database files to the archive, handling deduplication based on inode IDs.
* Here the dbDir could either be a snapshot db directory, the active om.db,
* compaction log dir, sst backup dir.
*
* @param sstFilesToExclude Set of SST file IDs to exclude from the archive
* @param dbDir Directory containing database files to archive
* @param files Stream of files to archive
* @param maxTotalSstSize Maximum total size of SST files to include
* @param archiveOutputStream Archive output stream
* @param tmpDir Temporary directory for processing
Expand All @@ -425,17 +441,12 @@ Set<Path> getSnapshotDirs(OMMetadataManager omMetadataManager) throws IOExceptio
* @throws IOException if an I/O error occurs
*/
@SuppressWarnings("checkstyle:ParameterNumber")
private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir, AtomicLong maxTotalSstSize,
private boolean writeDBToArchive(Set<String> sstFilesToExclude, Stream<Path> files, AtomicLong maxTotalSstSize,
ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, Path tmpDir,
Map<String, String> hardLinkFileMap, Path destDir, boolean onlySstFile) throws IOException {
if (!Files.exists(dbDir)) {
LOG.warn("DB directory {} does not exist. Skipping.", dbDir);
return true;
}
long bytesWritten = 0L;
int filesWritten = 0;
long lastLoggedTime = Time.monotonicNow();
try (Stream<Path> files = Files.list(dbDir)) {
Iterable<Path> iterable = files::iterator;
for (Path dbFile : iterable) {
if (!Files.isDirectory(dbFile)) {
Expand Down Expand Up @@ -469,7 +480,6 @@ private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir, Atom
}
}
}
}
return true;
}

Expand All @@ -481,18 +491,32 @@ private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir, Atom
*
* @param tmpdir Temporary directory for storing checkpoint-related files.
* @param flush If true, flushes in-memory data to disk before checkpointing.
* @return The created database checkpoint.
* @return List of SST file Path objects.
* @throws IOException If an error occurs during checkpoint creation or file copying.
*/
private DBCheckpoint createAndPrepareCheckpoint(Path tmpdir, boolean flush) throws IOException {
private DBCheckpoint createAndPrepareCheckpoint(Path tmpdir, boolean flush, List<Path> sstFiles) throws IOException {
// make tmp directories to contain the copies
Path tmpCompactionLogDir = tmpdir.resolve(getCompactionLogDir().getFileName());
Path tmpSstBackupDir = tmpdir.resolve(getSstBackupDir().getFileName());
Files.createDirectories(tmpSstBackupDir);
Comment thread
jojochuang marked this conversation as resolved.
Outdated

// Create checkpoint and then copy the files so that it has all the compaction entries and files.
// Create checkpoint.
DBCheckpoint dbCheckpoint = getDbStore().getCheckpoint(flush);
FileUtils.copyDirectory(getCompactionLogDir().toFile(), tmpCompactionLogDir.toFile());
OmSnapshotUtils.linkFiles(getSstBackupDir().toFile(), tmpSstBackupDir.toFile());

try (OmMetadataManagerImpl checkpointMetadataManager =
Comment thread
jojochuang marked this conversation as resolved.
OmMetadataManagerImpl.createCheckpointMetadataManager(getConf(), dbCheckpoint)) {
try (Table.KeyValueIterator<String, CompactionLogEntry>
iterator = checkpointMetadataManager.getCompactionLogTable().iterator()) {
iterator.seekToFirst();

while (iterator.hasNext()) {
CompactionLogEntry logEntry = iterator.next().getValue();
logEntry.getInputFileInfoList().forEach(f ->
sstFiles.add(Paths.get(f.getFileName())));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the path would only have filename here right i.e something like "x.sst" so when the file stream is passed to writeDBToArchive method I believe the absolute path wouldn't be available so it won't be copied to the tarball.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sadanand48 good catch. Can you check again now?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes looks good now, As swami suggested will test this part in #9132. I need to make changes there

}
}
} catch (Exception e) {

Copilot AI Oct 17, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching generic Exception is too broad. Catch specific exceptions like RocksDBException or IOException to handle different failure scenarios appropriately and avoid masking unexpected errors.

Copilot uses AI. Check for mistakes.
throw new IOException("Error reading compaction log from checkpoint", e);
}

return dbCheckpoint;
}
Expand Down