Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -620,18 +620,20 @@ public static void writeDBCheckpointToStream(
}
}

public static void includeFile(File file, String entryName,
public static long includeFile(File file, String entryName,
ArchiveOutputStream archiveOutputStream)
throws IOException {
ArchiveEntry archiveEntry =
archiveOutputStream.createArchiveEntry(file, entryName);
archiveOutputStream.putArchiveEntry(archiveEntry);
long bytesWritten;
try (InputStream fis = Files.newInputStream(file.toPath())) {
IOUtils.copy(fis, archiveOutputStream);
bytesWritten = IOUtils.copy(fis, archiveOutputStream);
archiveOutputStream.flush();
} finally {
archiveOutputStream.closeArchiveEntry();
}
return bytesWritten;
}

// Mark tarball completed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.ozone.om;

import static org.apache.commons.io.filefilter.TrueFileFilter.TRUE;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.includeFile;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.includeRatisSnapshotCompleteFlag;
import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR;
Expand Down Expand Up @@ -56,6 +57,11 @@
import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOCase;
import org.apache.commons.io.file.Counters;
import org.apache.commons.io.file.CountingPathVisitor;
import org.apache.commons.io.file.PathFilter;
import org.apache.commons.io.filefilter.SuffixFileFilter;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.recon.ReconConfig;
import org.apache.hadoop.hdds.utils.DBCheckpointServlet;
Expand All @@ -69,6 +75,7 @@
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -93,6 +100,8 @@ public class OMDBCheckpointServlet extends DBCheckpointServlet {
private static final long serialVersionUID = 1L;
private transient BootstrapStateHandler.Lock lock;
private long maxTotalSstSize = 0;
private static final PathFilter SST_FILE_FILTER =
new SuffixFileFilter(ROCKSDB_SST_SUFFIX, IOCase.INSENSITIVE);

@Override
public void init() throws ServletException {
Expand Down Expand Up @@ -167,6 +176,7 @@ public void writeDbDataToStream(DBCheckpoint checkpoint,
// Files to be excluded from tarball
Map<String, Map<Path, Path>> sstFilesToExclude = normalizeExcludeList(toExcludeList,
checkpoint.getCheckpointLocation(), sstBackupDir);

boolean completed = getFilesForArchive(checkpoint, copyFiles,
hardLinkFiles, sstFilesToExclude, includeSnapshotData(request),
excludedList, sstBackupDir, compactionLogDir);
Expand Down Expand Up @@ -270,13 +280,13 @@ public File getTmpDir() {

@SuppressWarnings("checkstyle:ParameterNumber")
private boolean getFilesForArchive(DBCheckpoint checkpoint,
Map<String, Map<Path, Path>> copyFiles,
Map<Path, Path> hardLinkFiles,
Map<String, Map<Path, Path>> sstFilesToExclude,
boolean includeSnapshotData,
List<String> excluded,
DirectoryData sstBackupDir,
DirectoryData compactionLogDir)
Map<String, Map<Path, Path>> copyFiles,
Map<Path, Path> hardLinkFiles,
Map<String, Map<Path, Path>> sstFilesToExclude,
boolean includeSnapshotData,
List<String> excluded,
DirectoryData sstBackupDir,
DirectoryData compactionLogDir)
throws IOException {

maxTotalSstSize = getConf().getLong(
Expand All @@ -290,6 +300,12 @@ private boolean getFilesForArchive(DBCheckpoint checkpoint,
}

AtomicLong copySize = new AtomicLong(0L);

// Log estimated total data transferred on first request.
if (sstFilesToExclude.isEmpty()) {
logEstimatedTarballSize(checkpoint, includeSnapshotData);
}

// Get the active fs files.
Path dir = checkpoint.getCheckpointLocation();
if (!processDir(dir, copyFiles, hardLinkFiles, sstFilesToExclude,
Expand All @@ -302,7 +318,7 @@ private boolean getFilesForArchive(DBCheckpoint checkpoint,
}

// Get the snapshot files.
Set<Path> snapshotPaths = waitForSnapshotDirs(checkpoint);
Set<Path> snapshotPaths = getSnapshotDirs(checkpoint, true);
Path snapshotDir = getSnapshotDir();
if (!processDir(snapshotDir, copyFiles, hardLinkFiles, sstFilesToExclude,
snapshotPaths, excluded, copySize, null)) {
Expand All @@ -320,16 +336,40 @@ private boolean getFilesForArchive(DBCheckpoint checkpoint,
hardLinkFiles, sstFilesToExclude,
new HashSet<>(), excluded, copySize,
compactionLogDir.getOriginalDir().toPath());
}

private void logEstimatedTarballSize(
DBCheckpoint checkpoint, boolean includeSnapshotData) {
try {
Counters.PathCounters counters = Counters.longPathCounters();
CountingPathVisitor visitor = new CountingPathVisitor(
counters, SST_FILE_FILTER, TRUE);
Files.walkFileTree(checkpoint.getCheckpointLocation(), visitor);
int totalSnapshots = 0;
if (includeSnapshotData) {
Set<Path> snapshotPaths = getSnapshotDirs(checkpoint, false);
totalSnapshots = snapshotPaths.size();
for (Path snapshotDir: snapshotPaths) {
Files.walkFileTree(snapshotDir, visitor);
}
}
LOG.info("Estimates for Checkpoint Tarball Stream - Data size: {} KB, " + "SST files: {}{}",
counters.getByteCounter().get() / (1024),
counters.getFileCounter().get(),
(includeSnapshotData ? ", snapshots: " + totalSnapshots : ""));
} catch (Exception e) {
LOG.error("Could not estimate size of transfer to Checkpoint Tarball Stream.", e);
}
}

/**
* The snapshotInfo table may contain a snapshot that
* doesn't yet exist on the fs, so wait a few seconds for it.
* @param checkpoint Checkpoint containing snapshot entries expected.
* @param waitForDir Wait for dir to exist on fs.
* @return Set of expected snapshot dirs.
*/
private Set<Path> waitForSnapshotDirs(DBCheckpoint checkpoint)
private Set<Path> getSnapshotDirs(DBCheckpoint checkpoint, boolean waitForDir)
throws IOException {

OzoneConfiguration conf = getConf();
Expand All @@ -348,7 +388,9 @@ private Set<Path> waitForSnapshotDirs(DBCheckpoint checkpoint)
while (iterator.hasNext()) {
Table.KeyValue<String, SnapshotInfo> entry = iterator.next();
Path path = Paths.get(getSnapshotPath(conf, entry.getValue()));
waitForDirToExist(path);
if (waitForDir) {
waitForDirToExist(path);
}
snapshotPaths.add(path);
}
} finally {
Expand Down Expand Up @@ -552,25 +594,39 @@ private void writeFilesToArchive(
e.getKey().getFileName().toString().toLowerCase().endsWith(".sst")).
collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

long bytesWritten = 0L;
int filesWritten = 0;
long lastLoggedTime = Time.monotonicNow();

// Go through each of the files to be copied and add to archive.
for (Map.Entry<Path, Path> entry : filteredCopyFiles.entrySet()) {
Path file = entry.getValue();
Path path = entry.getValue();

// Confirm the data is in the right place.
if (!file.toString().startsWith(metaDirPath.toString())) {
if (!path.toString().startsWith(metaDirPath.toString())) {
throw new IOException("tarball file not in metadata dir: "
+ file + ": " + metaDirPath);
+ path + ": " + metaDirPath);
}

String fixedFile = truncateFileName(truncateLength, file);
String fixedFile = truncateFileName(truncateLength, path);
if (fixedFile.startsWith(OM_CHECKPOINT_DIR)) {
// checkpoint files go to root of tarball
Path f = Paths.get(fixedFile).getFileName();
if (f != null) {
fixedFile = f.toString();
}
}
includeFile(entry.getKey().toFile(), fixedFile, archiveOutputStream);
File file = entry.getKey().toFile();
if (!file.isDirectory()) {
filesWritten++;
}
bytesWritten += includeFile(file, fixedFile, archiveOutputStream);
// Log progress every 30 seconds
if (Time.monotonicNow() - lastLoggedTime >= 30000) {
LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...",
bytesWritten / (1024), filesWritten);
lastLoggedTime = Time.monotonicNow();
}
}

if (completed) {
Expand All @@ -595,6 +651,10 @@ private void writeFilesToArchive(
// Mark tarball completed.
includeRatisSnapshotCompleteFlag(archiveOutputStream);
}
LOG.info("Completed transfer of {} KB, #files {} " +
"to checkpoint tarball stream.{}",
bytesWritten / (1024), filesWritten, (completed) ?
" Checkpoint tarball is complete." : "");
}

@Nonnull
Expand Down