Skip to content
8 changes: 8 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2021,6 +2021,14 @@
This fallback approach is not recommended for production environments.
</description>
</property>
<property>
<name>ozone.om.ratis.snapshot.max.total.sst.size</name>
<value>100000000</value>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the unit (MB/GB) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just bytes. Should be MB. I'll change it.

<tag>OZONE, OM, RATIS</tag>
<description>
Max size of SST files in OM Ratis Snapshot tarball.
</description>
</property>
<property>
<name>ozone.om.snapshot.provider.socket.timeout</name>
<value>5000s</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ public final class HddsServerUtil {
private HddsServerUtil() {
}

public static final String OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME =
"OZONE_RATIS_SNAPSHOT_COMPLETE";

private static final Logger LOG = LoggerFactory.getLogger(
HddsServerUtil.class);

Expand Down Expand Up @@ -590,6 +593,7 @@ public static void writeDBCheckpointToStream(
}
}
}
includeRatisSnapshotCompleteFlag(archiveOutputStream);
}
}

Expand All @@ -605,6 +609,20 @@ public static void includeFile(File file, String entryName,
archiveOutputStream.closeArchiveEntry();
}

// Mark tarball completed.
public static void includeRatisSnapshotCompleteFlag(
ArchiveOutputStream archiveOutput) throws IOException {
File file = File.createTempFile(
OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME, "");
String entryName = OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME;
includeFile(file, entryName, archiveOutput);
}

static boolean ratisSnapshotComplete(Path dir) {
return new File(dir.toString(),
OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME).exists();
}

// optimize ugi lookup for RPC operations to avoid a trip through
// UGI.getCurrentUser which is synch'ed
public static UserGroupInformation getRemoteUser() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.hadoop.hdds.utils.HddsServerUtil.ratisSnapshotComplete;
import static org.apache.hadoop.ozone.OzoneConsts.SNAPSHOT_CANDIDATE_DIR;

/**
Expand Down Expand Up @@ -110,21 +111,26 @@ public DBCheckpoint downloadDBSnapshotFromLeader(String leaderNodeID)
"reloading state from the snapshot.", leaderNodeID);
checkLeaderConsistency(leaderNodeID);

String snapshotFileName = getSnapshotFileName(leaderNodeID);
File targetFile = new File(snapshotDir, snapshotFileName);
downloadSnapshot(leaderNodeID, targetFile);
LOG.info("Successfully download the latest snapshot {} from leader OM: {}",
targetFile, leaderNodeID);

numDownloaded.incrementAndGet();
injectPause();

RocksDBCheckpoint checkpoint = getCheckpointFromSnapshotFile(targetFile,
candidateDir, true);
LOG.info("Successfully untar the downloaded snapshot {} at {}.", targetFile,
checkpoint.getCheckpointLocation());

return checkpoint;
while (true) {
String snapshotFileName = getSnapshotFileName(leaderNodeID);
File targetFile = new File(snapshotDir, snapshotFileName);
downloadSnapshot(leaderNodeID, targetFile);
LOG.info(
"Successfully download the latest snapshot {} from leader OM: {}",
targetFile, leaderNodeID);

numDownloaded.incrementAndGet();
injectPause();

RocksDBCheckpoint checkpoint = getCheckpointFromSnapshotFile(targetFile,
candidateDir, true);
LOG.info("Successfully untar the downloaded snapshot {} at {}.",
targetFile, checkpoint.getCheckpointLocation());
if (ratisSnapshotComplete(checkpoint.getCheckpointLocation())) {
LOG.info("Ratis snapshot transfer is complete.");
return checkpoint;
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ private OMConfigKeys() {
public static final long
OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_DEFAULT = 400000;

public static final String OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY
= "ozone.om.ratis.snapshot.max.total.sst.size";
public static final long
OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT = 100_000_000;

// OM Ratis server configurations
public static final String OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_KEY
= "ozone.om.ratis.server.request.timeout";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.commons.io.FileUtils;

import static org.apache.hadoop.hdds.recon.ReconConfig.ConfigStrings.OZONE_RECON_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
Expand Down Expand Up @@ -703,7 +704,9 @@ private Set<String> getFiles(Path path, int truncateLength,
if (file.toFile().isDirectory()) {
getFiles(file, truncateLength, fileSet);
}
if (!file.getFileName().toString().startsWith("fabricated")) {
String filename = file.getFileName().toString();
if (!filename.startsWith("fabricated") &&
!filename.startsWith(OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME)) {
fileSet.add(truncateFileName(truncateLength, file));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.hadoop.ozone.om;

import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.ExitManager;
Expand Down Expand Up @@ -64,15 +67,19 @@
import org.slf4j.event.Level;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand All @@ -83,6 +90,7 @@
import java.util.stream.Stream;

import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY;
import static org.apache.hadoop.ozone.om.OmSnapshotManager.OM_HARDLINK_FILE;
import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
import static org.apache.hadoop.ozone.om.TestOzoneManagerHAWithData.createKey;
Expand Down Expand Up @@ -197,6 +205,13 @@ public void testInstallSnapshot(int numSnapshotsToCreate) throws Exception {
}
OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);

List<Set<String>> sstSetList = new ArrayList<>();
FaultInjector faultInjector =
new SnapshotMaxSizeInjector(leaderOM,
followerOM.getOmSnapshotProvider().getSnapshotDir(),
sstSetList);
followerOM.getOmSnapshotProvider().setInjector(faultInjector);

// Create some snapshots, each with new keys
int keyIncrement = 10;
String snapshotNamePrefix = "snapshot";
Expand Down Expand Up @@ -283,6 +298,17 @@ public void testInstallSnapshot(int numSnapshotsToCreate) throws Exception {
*/

checkSnapshot(leaderOM, followerOM, snapshotName, keys, snapshotInfo);
int sstFileCount = 0;
Set<String> sstFileUnion = new HashSet<>();
for (Set<String> sstFiles : sstSetList) {
sstFileCount += sstFiles.size();
sstFileUnion.addAll(sstFiles);
}
// Confirm that there were multiple tarballs.
assertTrue(sstSetList.size() > 1);
// Confirm that there was no overlap of sst files
// between the individual tarballs.
assertEquals(sstFileUnion.size(), sstFileCount);
}

private void checkSnapshot(OzoneManager leaderOM, OzoneManager followerOM,
Expand Down Expand Up @@ -1110,4 +1136,108 @@ public void reset() throws IOException {
init();
}
}

// Interrupts the tarball download process to test creation of
// multiple tarballs as needed when the tarball size exceeds the
// max.
private static class SnapshotMaxSizeInjector extends FaultInjector {
private final OzoneManager om;
private int count;
private final File snapshotDir;
private final List<Set<String>> sstSetList;
private final Path tempDir;
SnapshotMaxSizeInjector(OzoneManager om, File snapshotDir,
List<Set<String>> sstSetList) throws IOException {
this.om = om;
this.snapshotDir = snapshotDir;
this.sstSetList = sstSetList;
this.tempDir = Files.createTempDirectory("tmpDirPrefix");
init();
}

@Override
public void init() {
}

@Override
// Pause each time a tarball is received, to process it.
public void pause() throws IOException {
count++;
File tarball = getTarball(snapshotDir);
// First time through, get total size of sst files and reduce
// max size config. That way next time through, we get multiple
// tarballs.
if (count == 1) {
long sstSize = getSizeOfSstFiles(tarball);
om.getConfiguration().setLong(
OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY, sstSize / 2);
// Now empty the tarball to restart the download
// process from the beginning.
createEmptyTarball(tarball);
} else {
// Each time we get a new tarball add a set of
// its sst file to the list, (i.e. one per tarball.)
sstSetList.add(getSstFilenames(tarball));
}
}

// Get Size of sstfiles in tarball.
private long getSizeOfSstFiles(File tarball) throws IOException {
FileUtil.unTar(tarball, tempDir.toFile());
List<Path> sstPaths = Files.walk(tempDir).filter(
path -> path.toString().endsWith(".sst")).
collect(Collectors.toList());
long sstSize = 0;
for (Path sstPath : sstPaths) {
sstSize += Files.size(sstPath);
}
return sstSize;
}

private void createEmptyTarball(File dummyTarFile)
throws IOException {
FileOutputStream fileOutputStream = new FileOutputStream(dummyTarFile);
TarArchiveOutputStream archiveOutputStream =
new TarArchiveOutputStream(fileOutputStream);
archiveOutputStream.close();
}

// Return a list of sst files in tarball.
private Set<String> getSstFilenames(File tarball)
throws IOException {
Set<String> sstFilenames = new HashSet<>();
try (TarArchiveInputStream tarInput =
new TarArchiveInputStream(new FileInputStream(tarball))) {
TarArchiveEntry entry;
while ((entry = tarInput.getNextTarEntry()) != null) {
String name = entry.getName();
if (name.toLowerCase().endsWith(".sst")) {
sstFilenames.add(entry.getName());
}
}
}
return sstFilenames;
}

// Find the tarball in the dir.
private File getTarball(File dir) {
File[] fileList = dir.listFiles();
assertNotNull(fileList);
for (File f : fileList) {
if (f.getName().toLowerCase().endsWith(".tar")) {
return f;
}
}
return null;
}

@Override
public void resume() throws IOException {
}

@Override
public void reset() throws IOException {
init();
}
}
}
Loading