diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a9a07371a1b8..e035c5ef97b1 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2021,6 +2021,14 @@
This fallback approach is not recommended for production environments.
+
+ ozone.om.ratis.snapshot.max.total.sst.size
+ 100000000
+ OZONE, OM, RATIS
+
+ Max size of SST files in OM Ratis Snapshot tarball.
+
+
ozone.om.snapshot.provider.socket.timeout
5000s
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index c6840d39d53f..2c59bc1e3cfa 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -109,6 +109,9 @@ public final class HddsServerUtil {
private HddsServerUtil() {
}
+ public static final String OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME =
+ "OZONE_RATIS_SNAPSHOT_COMPLETE";
+
private static final Logger LOG = LoggerFactory.getLogger(
HddsServerUtil.class);
@@ -590,6 +593,7 @@ public static void writeDBCheckpointToStream(
}
}
}
+ includeRatisSnapshotCompleteFlag(archiveOutputStream);
}
}
@@ -605,6 +609,20 @@ public static void includeFile(File file, String entryName,
archiveOutputStream.closeArchiveEntry();
}
+ // Mark tarball completed.
+ public static void includeRatisSnapshotCompleteFlag(
+ ArchiveOutputStream archiveOutput) throws IOException {
+ File file = File.createTempFile(
+ OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME, "");
+ String entryName = OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME;
+ includeFile(file, entryName, archiveOutput);
+ }
+
+ static boolean ratisSnapshotComplete(Path dir) {
+ return new File(dir.toString(),
+ OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME).exists();
+ }
+
// optimize ugi lookup for RPC operations to avoid a trip through
// UGI.getCurrentUser which is synch'ed
public static UserGroupInformation getRemoteUser() throws IOException {
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java
index 261e4e103dab..2ebb56179c08 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java
@@ -34,6 +34,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.ratisSnapshotComplete;
import static org.apache.hadoop.ozone.OzoneConsts.SNAPSHOT_CANDIDATE_DIR;
/**
@@ -110,21 +111,26 @@ public DBCheckpoint downloadDBSnapshotFromLeader(String leaderNodeID)
"reloading state from the snapshot.", leaderNodeID);
checkLeaderConsistency(leaderNodeID);
- String snapshotFileName = getSnapshotFileName(leaderNodeID);
- File targetFile = new File(snapshotDir, snapshotFileName);
- downloadSnapshot(leaderNodeID, targetFile);
- LOG.info("Successfully download the latest snapshot {} from leader OM: {}",
- targetFile, leaderNodeID);
-
- numDownloaded.incrementAndGet();
- injectPause();
-
- RocksDBCheckpoint checkpoint = getCheckpointFromSnapshotFile(targetFile,
- candidateDir, true);
- LOG.info("Successfully untar the downloaded snapshot {} at {}.", targetFile,
- checkpoint.getCheckpointLocation());
-
- return checkpoint;
+ while (true) {
+ String snapshotFileName = getSnapshotFileName(leaderNodeID);
+ File targetFile = new File(snapshotDir, snapshotFileName);
+ downloadSnapshot(leaderNodeID, targetFile);
+ LOG.info(
+ "Successfully download the latest snapshot {} from leader OM: {}",
+ targetFile, leaderNodeID);
+
+ numDownloaded.incrementAndGet();
+ injectPause();
+
+ RocksDBCheckpoint checkpoint = getCheckpointFromSnapshotFile(targetFile,
+ candidateDir, true);
+ LOG.info("Successfully untar the downloaded snapshot {} at {}.",
+ targetFile, checkpoint.getCheckpointLocation());
+ if (ratisSnapshotComplete(checkpoint.getCheckpointLocation())) {
+ LOG.info("Ratis snapshot transfer is complete.");
+ return checkpoint;
+ }
+ }
}
/**
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index c5c07e285763..1da522495227 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -202,6 +202,11 @@ private OMConfigKeys() {
public static final long
OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_DEFAULT = 400000;
+ public static final String OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY
+ = "ozone.om.ratis.snapshot.max.total.sst.size";
+ public static final long
+ OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT = 100_000_000;
+
// OM Ratis server configurations
public static final String OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_KEY
= "ozone.om.ratis.server.request.timeout";
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
index c16b31cc3693..b57fbbb78efe 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
@@ -71,6 +71,7 @@
import org.apache.commons.io.FileUtils;
import static org.apache.hadoop.hdds.recon.ReconConfig.ConfigStrings.OZONE_RECON_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
@@ -703,7 +704,9 @@ private Set getFiles(Path path, int truncateLength,
if (file.toFile().isDirectory()) {
getFiles(file, truncateLength, fileSet);
}
- if (!file.getFileName().toString().startsWith("fabricated")) {
+ String filename = file.getFileName().toString();
+ if (!filename.startsWith("fabricated") &&
+ !filename.startsWith(OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME)) {
fileSet.add(truncateFileName(truncateLength, file));
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
index 5cd5b788495d..94401b8dcc42 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -16,6 +16,9 @@
*/
package org.apache.hadoop.ozone.om;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.ExitManager;
@@ -64,6 +67,8 @@
import org.slf4j.event.Level;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -71,8 +76,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -83,6 +90,7 @@
import java.util.stream.Stream;
import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY;
import static org.apache.hadoop.ozone.om.OmSnapshotManager.OM_HARDLINK_FILE;
import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
import static org.apache.hadoop.ozone.om.TestOzoneManagerHAWithData.createKey;
@@ -197,6 +205,13 @@ public void testInstallSnapshot(int numSnapshotsToCreate) throws Exception {
}
OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+ List> sstSetList = new ArrayList<>();
+ FaultInjector faultInjector =
+ new SnapshotMaxSizeInjector(leaderOM,
+ followerOM.getOmSnapshotProvider().getSnapshotDir(),
+ sstSetList);
+ followerOM.getOmSnapshotProvider().setInjector(faultInjector);
+
// Create some snapshots, each with new keys
int keyIncrement = 10;
String snapshotNamePrefix = "snapshot";
@@ -283,6 +298,17 @@ public void testInstallSnapshot(int numSnapshotsToCreate) throws Exception {
*/
checkSnapshot(leaderOM, followerOM, snapshotName, keys, snapshotInfo);
+ int sstFileCount = 0;
+ Set sstFileUnion = new HashSet<>();
+ for (Set sstFiles : sstSetList) {
+ sstFileCount += sstFiles.size();
+ sstFileUnion.addAll(sstFiles);
+ }
+ // Confirm that there were multiple tarballs.
+ assertTrue(sstSetList.size() > 1);
+ // Confirm that there was no overlap of sst files
+ // between the individual tarballs.
+ assertEquals(sstFileUnion.size(), sstFileCount);
}
private void checkSnapshot(OzoneManager leaderOM, OzoneManager followerOM,
@@ -1110,4 +1136,108 @@ public void reset() throws IOException {
init();
}
}
+
+ // Interrupts the tarball download process to test creation of
+ // multiple tarballs as needed when the tarball size exceeds the
+ // max.
+ private static class SnapshotMaxSizeInjector extends FaultInjector {
+ private final OzoneManager om;
+ private int count;
+ private final File snapshotDir;
+ private final List> sstSetList;
+ private final Path tempDir;
+ SnapshotMaxSizeInjector(OzoneManager om, File snapshotDir,
+ List> sstSetList) throws IOException {
+ this.om = om;
+ this.snapshotDir = snapshotDir;
+ this.sstSetList = sstSetList;
+ this.tempDir = Files.createTempDirectory("tmpDirPrefix");
+ init();
+ }
+
+ @Override
+ public void init() {
+ }
+
+ @Override
+ // Pause each time a tarball is received, to process it.
+ public void pause() throws IOException {
+ count++;
+ File tarball = getTarball(snapshotDir);
+ // First time through, get total size of sst files and reduce
+ // max size config. That way next time through, we get multiple
+ // tarballs.
+ if (count == 1) {
+ long sstSize = getSizeOfSstFiles(tarball);
+ om.getConfiguration().setLong(
+ OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY, sstSize / 2);
+ // Now empty the tarball to restart the download
+ // process from the beginning.
+ createEmptyTarball(tarball);
+ } else {
+ // Each time we get a new tarball add a set of
+ // its sst file to the list, (i.e. one per tarball.)
+ sstSetList.add(getSstFilenames(tarball));
+ }
+ }
+
+ // Get Size of sstfiles in tarball.
+ private long getSizeOfSstFiles(File tarball) throws IOException {
+ FileUtil.unTar(tarball, tempDir.toFile());
+ List sstPaths = Files.walk(tempDir).filter(
+ path -> path.toString().endsWith(".sst")).
+ collect(Collectors.toList());
+ long sstSize = 0;
+ for (Path sstPath : sstPaths) {
+ sstSize += Files.size(sstPath);
+ }
+ return sstSize;
+ }
+
+ private void createEmptyTarball(File dummyTarFile)
+ throws IOException {
+ FileOutputStream fileOutputStream = new FileOutputStream(dummyTarFile);
+ TarArchiveOutputStream archiveOutputStream =
+ new TarArchiveOutputStream(fileOutputStream);
+ archiveOutputStream.close();
+ }
+
+ // Return a list of sst files in tarball.
+ private Set getSstFilenames(File tarball)
+ throws IOException {
+ Set sstFilenames = new HashSet<>();
+ try (TarArchiveInputStream tarInput =
+ new TarArchiveInputStream(new FileInputStream(tarball))) {
+ TarArchiveEntry entry;
+ while ((entry = tarInput.getNextTarEntry()) != null) {
+ String name = entry.getName();
+ if (name.toLowerCase().endsWith(".sst")) {
+ sstFilenames.add(entry.getName());
+ }
+ }
+ }
+ return sstFilenames;
+ }
+
+ // Find the tarball in the dir.
+ private File getTarball(File dir) {
+ File[] fileList = dir.listFiles();
+ assertNotNull(fileList);
+ for (File f : fileList) {
+ if (f.getName().toLowerCase().endsWith(".tar")) {
+ return f;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void resume() throws IOException {
+ }
+
+ @Override
+ public void reset() throws IOException {
+ init();
+ }
+ }
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
index f6e14fe97a30..7fb339d24c4c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
@@ -52,15 +52,19 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.includeFile;
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.includeRatisSnapshotCompleteFlag;
import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR;
import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR;
import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA;
import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY;
import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.createHardLinkList;
import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.truncateFileName;
@@ -84,7 +88,7 @@ public class OMDBCheckpointServlet extends DBCheckpointServlet {
LoggerFactory.getLogger(OMDBCheckpointServlet.class);
private static final long serialVersionUID = 1L;
private transient BootstrapStateHandler.Lock lock;
-
+ private long maxTotalSstSize = 0;
@Override
public void init() throws ServletException {
@@ -144,9 +148,11 @@ public void writeDbDataToStream(DBCheckpoint checkpoint,
Set toExcludeFiles = normalizeExcludeList(toExcludeList,
checkpoint.getCheckpointLocation().toString(),
ServerUtils.getOzoneMetaDirPath(getConf()).toString());
- getFilesForArchive(checkpoint, copyFiles, hardLinkFiles, toExcludeFiles,
- includeSnapshotData(request), excludedList);
- writeFilesToArchive(copyFiles, hardLinkFiles, archiveOutputStream);
+ boolean completed = getFilesForArchive(checkpoint, copyFiles,
+ hardLinkFiles, toExcludeFiles, includeSnapshotData(request),
+ excludedList);
+ writeFilesToArchive(copyFiles, hardLinkFiles, archiveOutputStream,
+ completed);
} catch (Exception e) {
LOG.error("got exception writing to archive " + e);
throw e;
@@ -169,7 +175,7 @@ public static Set normalizeExcludeList(List toExcludeList,
return paths;
}
- private void getFilesForArchive(DBCheckpoint checkpoint,
+ private boolean getFilesForArchive(DBCheckpoint checkpoint,
Set copyFiles,
Map hardLinkFiles,
Set toExcludeFiles,
@@ -177,21 +183,28 @@ private void getFilesForArchive(DBCheckpoint checkpoint,
List excluded)
throws IOException {
+ maxTotalSstSize = getConf().getLong(
+ OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY,
+ OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT);
+
+ AtomicLong copySize = new AtomicLong(0L);
// Get the active fs files.
Path dir = checkpoint.getCheckpointLocation();
- processDir(dir, copyFiles, hardLinkFiles, toExcludeFiles,
- new HashSet<>(), excluded);
+ if (!processDir(dir, copyFiles, hardLinkFiles, toExcludeFiles,
+ new HashSet<>(), excluded, copySize)) {
+ return false;
+ }
if (!includeSnapshotData) {
- return;
+ return true;
}
// Get the snapshot files.
Set snapshotPaths = waitForSnapshotDirs(checkpoint);
Path snapshotDir = Paths.get(OMStorage.getOmDbDir(getConf()).toString(),
OM_SNAPSHOT_DIR);
- processDir(snapshotDir, copyFiles, hardLinkFiles, toExcludeFiles,
- snapshotPaths, excluded);
+ return processDir(snapshotDir, copyFiles, hardLinkFiles, toExcludeFiles,
+ snapshotPaths, excluded, copySize);
}
/**
@@ -234,11 +247,12 @@ private void waitForDirToExist(Path dir) throws IOException {
}
}
- private void processDir(Path dir, Set copyFiles,
+ private boolean processDir(Path dir, Set copyFiles,
Map hardLinkFiles,
Set toExcludeFiles,
Set snapshotPaths,
- List excluded)
+ List excluded,
+ AtomicLong copySize)
throws IOException {
try (Stream files = Files.list(dir)) {
for (Path file : files.collect(Collectors.toList())) {
@@ -251,13 +265,22 @@ private void processDir(Path dir, Set copyFiles,
LOG.debug("Skipping unneeded file: " + file);
continue;
}
- processDir(file, copyFiles, hardLinkFiles, toExcludeFiles,
- snapshotPaths, excluded);
+ if (!processDir(file, copyFiles, hardLinkFiles, toExcludeFiles,
+ snapshotPaths, excluded, copySize)) {
+ return false;
+ }
} else {
- processFile(file, copyFiles, hardLinkFiles, toExcludeFiles, excluded);
+ long fileSize = processFile(file, copyFiles, hardLinkFiles,
+ toExcludeFiles, excluded);
+ if (copySize.get() + fileSize > maxTotalSstSize) {
+ return false;
+ } else {
+ copySize.addAndGet(fileSize);
+ }
}
}
}
+ return true;
}
/**
@@ -272,10 +295,11 @@ private void processDir(Path dir, Set copyFiles,
* @param excluded The list of db files that actually were excluded.
*/
@VisibleForTesting
- public static void processFile(Path file, Set copyFiles,
+ public static long processFile(Path file, Set copyFiles,
Map hardLinkFiles,
Set toExcludeFiles,
- List excluded) {
+ List excluded) throws IOException {
+ long fileSize = 0;
if (toExcludeFiles.contains(file)) {
excluded.add(file.toString());
} else {
@@ -297,6 +321,7 @@ public static void processFile(Path file, Set copyFiles,
} else {
// Add to tarball.
copyFiles.add(file);
+ fileSize = Files.size(file);
}
}
} else {
@@ -304,6 +329,7 @@ public static void processFile(Path file, Set copyFiles,
copyFiles.add(file);
}
}
+ return fileSize;
}
// If fileName exists in "files" parameter,
@@ -326,14 +352,20 @@ private boolean includeSnapshotData(HttpServletRequest request) {
private void writeFilesToArchive(Set copyFiles,
Map hardLinkFiles,
- ArchiveOutputStream archiveOutputStream)
+ ArchiveOutputStream archiveOutputStream,
+ boolean completed)
throws IOException {
File metaDirPath = ServerUtils.getOzoneMetaDirPath(getConf());
int truncateLength = metaDirPath.toString().length() + 1;
+ Set filteredCopyFiles = completed ? copyFiles :
+ copyFiles.stream().filter(path ->
+ path.getFileName().toString().toLowerCase().endsWith(".sst")).
+ collect(Collectors.toSet());
+
// Go through each of the files to be copied and add to archive.
- for (Path file : copyFiles) {
+ for (Path file : filteredCopyFiles) {
String fixedFile = truncateFileName(truncateLength, file);
if (fixedFile.startsWith(OM_CHECKPOINT_DIR)) {
// checkpoint files go to root of tarball
@@ -345,11 +377,15 @@ private void writeFilesToArchive(Set copyFiles,
includeFile(file.toFile(), fixedFile, archiveOutputStream);
}
- // Create list of hard links.
- if (!hardLinkFiles.isEmpty()) {
- Path hardLinkFile = createHardLinkList(truncateLength, hardLinkFiles);
- includeFile(hardLinkFile.toFile(), OmSnapshotManager.OM_HARDLINK_FILE,
- archiveOutputStream);
+ if (completed) {
+ // Only create the hard link list for the last tarball.
+ if (!hardLinkFiles.isEmpty()) {
+ Path hardLinkFile = createHardLinkList(truncateLength, hardLinkFiles);
+ includeFile(hardLinkFile.toFile(), OmSnapshotManager.OM_HARDLINK_FILE,
+ archiveOutputStream);
+ }
+ // Mark tarball completed.
+ includeRatisSnapshotCompleteFlag(archiveOutputStream);
}
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
index eac63a54bc7c..0b432f159259 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java
@@ -117,6 +117,14 @@ public static void createHardLinks(Path dbPath) throws IOException {
String to = l.split("\t")[0];
Path fullFromPath = Paths.get(dbPath.toString(), from);
Path fullToPath = Paths.get(dbPath.toString(), to);
+ // Make parent dir if it doesn't exist.
+ Path parent = fullToPath.getParent();
+ if ((parent != null) && (!parent.toFile().exists())) {
+ if (!parent.toFile().mkdirs()) {
+ throw new IOException(
+ "Failed to create directory: " + parent.toString());
+ }
+ }
Files.createLink(fullToPath, fullFromPath);
}
if (!hardLinkFile.delete()) {
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
index 6af88cf2411f..52f56deaa69d 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
@@ -39,6 +39,7 @@
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -340,67 +341,87 @@ public void testExcludeUtilities() throws IOException {
* should be copied, linked, or excluded from the tarball entirely.
*/
@Test
- public void testProcessFile() {
+ public void testProcessFile() throws IOException {
+ Assert.assertTrue(new File(testDir.toString(), "snap1").mkdirs());
+ Assert.assertTrue(new File(testDir.toString(), "snap2").mkdirs());
Path copyFile = Paths.get(testDir.toString(),
"snap1/copyfile.sst");
+ Files.write(copyFile,
+ "dummyData".getBytes(StandardCharsets.UTF_8));
+ long expectedFileSize = Files.size(copyFile);
Path excludeFile = Paths.get(testDir.toString(),
"snap1/excludeFile.sst");
+ Files.write(excludeFile,
+ "dummyData".getBytes(StandardCharsets.UTF_8));
Path linkToExcludedFile = Paths.get(testDir.toString(),
"snap2/excludeFile.sst");
+ Files.write(linkToExcludedFile,
+ "dummyData".getBytes(StandardCharsets.UTF_8));
Path linkToCopiedFile = Paths.get(testDir.toString(),
"snap2/copyfile.sst");
+ Files.write(linkToCopiedFile,
+ "dummyData".getBytes(StandardCharsets.UTF_8));
Path addToCopiedFiles = Paths.get(testDir.toString(),
"snap1/copyfile2.sst");
+ Files.write(addToCopiedFiles,
+ "dummyData".getBytes(StandardCharsets.UTF_8));
Path addNonSstToCopiedFiles = Paths.get(testDir.toString(),
"snap1/nonSst");
+ Files.write(addNonSstToCopiedFiles,
+ "dummyData".getBytes(StandardCharsets.UTF_8));
Set toExcludeFiles = new HashSet<>(
Collections.singletonList(excludeFile));
Set copyFiles = new HashSet<>(Collections.singletonList(copyFile));
List excluded = new ArrayList<>();
Map hardLinkFiles = new HashMap<>();
-
+ long fileSize;
// Confirm the exclude file gets added to the excluded list,
// (and thus is excluded.)
- processFile(excludeFile, copyFiles, hardLinkFiles, toExcludeFiles,
- excluded);
+ fileSize = processFile(excludeFile, copyFiles, hardLinkFiles,
+ toExcludeFiles, excluded);
Assert.assertEquals(excluded.size(), 1);
Assert.assertEquals((excluded.get(0)), excludeFile.toString());
Assert.assertEquals(copyFiles.size(), 1);
Assert.assertEquals(hardLinkFiles.size(), 0);
+ Assert.assertEquals(fileSize, 0);
excluded = new ArrayList<>();
// Confirm the linkToExcludedFile gets added as a link.
- processFile(linkToExcludedFile, copyFiles, hardLinkFiles, toExcludeFiles,
- excluded);
+ fileSize = processFile(linkToExcludedFile, copyFiles, hardLinkFiles,
+ toExcludeFiles, excluded);
Assert.assertEquals(excluded.size(), 0);
Assert.assertEquals(copyFiles.size(), 1);
Assert.assertEquals(hardLinkFiles.size(), 1);
Assert.assertEquals(hardLinkFiles.get(linkToExcludedFile), excludeFile);
+ Assert.assertEquals(fileSize, 0);
hardLinkFiles = new HashMap<>();
// Confirm the linkToCopiedFile gets added as a link.
- processFile(linkToCopiedFile, copyFiles, hardLinkFiles, toExcludeFiles,
- excluded);
+ fileSize = processFile(linkToCopiedFile, copyFiles, hardLinkFiles,
+ toExcludeFiles, excluded);
Assert.assertEquals(excluded.size(), 0);
Assert.assertEquals(copyFiles.size(), 1);
Assert.assertEquals(hardLinkFiles.size(), 1);
Assert.assertEquals(hardLinkFiles.get(linkToCopiedFile), copyFile);
+ Assert.assertEquals(fileSize, 0);
hardLinkFiles = new HashMap<>();
// Confirm the addToCopiedFiles gets added to list of copied files
- processFile(addToCopiedFiles, copyFiles, hardLinkFiles, toExcludeFiles,
- excluded);
+ fileSize = processFile(addToCopiedFiles, copyFiles, hardLinkFiles,
+ toExcludeFiles, excluded);
Assert.assertEquals(excluded.size(), 0);
Assert.assertEquals(copyFiles.size(), 2);
Assert.assertTrue(copyFiles.contains(addToCopiedFiles));
+ Assert.assertEquals(fileSize, expectedFileSize);
copyFiles = new HashSet<>(Collections.singletonList(copyFile));
// Confirm the addNonSstToCopiedFiles gets added to list of copied files
- processFile(addNonSstToCopiedFiles, copyFiles, hardLinkFiles,
+ fileSize = processFile(addNonSstToCopiedFiles, copyFiles, hardLinkFiles,
toExcludeFiles, excluded);
Assert.assertEquals(excluded.size(), 0);
Assert.assertEquals(copyFiles.size(), 2);
+ Assert.assertEquals(fileSize, 0);
Assert.assertTrue(copyFiles.contains(addNonSstToCopiedFiles));
}