Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,12 @@ public final class OzoneConsts {
*/
public static final String ETAG = "ETag";

/**
* A constant string used as a separator in various contexts within
* the OMDBCheckpoint functions.
*/
public static final String OM_SNAPSHOT_SEPARATOR = "-";

private OzoneConsts() {
// Never Constructed
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hdds.HddsUtils.fromProtobuf;
import static org.apache.hadoop.hdds.HddsUtils.toProtobuf;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_SEPARATOR;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -57,7 +58,6 @@ public final class SnapshotInfo implements Auditable, CopyObject<SnapshotInfo> {
SnapshotInfo::getProtobuf,
SnapshotInfo.class);

private static final String SEPARATOR = "-";
private static final long INVALID_TIMESTAMP = -1;
private static final UUID INITIAL_SNAPSHOT_ID = UUID.randomUUID();

Expand Down Expand Up @@ -548,7 +548,7 @@ public Map<String, String> toAuditMap() {
public static String getCheckpointDirName(UUID snapshotId) {
Objects.requireNonNull(snapshotId,
"SnapshotId is needed to create checkpoint directory");
return SEPARATOR + snapshotId;
return OM_SNAPSHOT_SEPARATOR + snapshotId;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.ozone.om;

import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
Expand All @@ -28,8 +29,10 @@
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.doCallRealMethod;
Expand All @@ -54,6 +57,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand All @@ -68,6 +72,7 @@
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
Expand All @@ -77,13 +82,22 @@
import org.apache.hadoop.ozone.TestDataUtil;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneSnapshot;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;

/**
* Class used for testing the OM DB Checkpoint provider servlet using inode based transfer logic.
Expand All @@ -106,6 +120,9 @@ public class TestOMDbCheckpointServletInodeBasedXfer {
@BeforeEach
void init() throws Exception {
conf = new OzoneConfiguration();
// ensure cache entries are not evicted thereby snapshot db's are not closed
conf.setTimeDuration(OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL,
100, TimeUnit.MINUTES);
}

@AfterEach
Expand Down Expand Up @@ -193,33 +210,12 @@ public void write(int b) throws IOException {

@Test
void testContentsOfTarballWithSnapshot() throws Exception {
setupCluster();
setupMocks();
when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)).thenReturn("true");
String volumeName = "vol" + RandomStringUtils.secure().nextNumeric(5);
String bucketName = "buck" + RandomStringUtils.secure().nextNumeric(5);
// Create a "spy" dbstore keep track of the checkpoint.
writeData(volumeName, bucketName, true);
DBStore dbStore = om.getMetadataManager().getStore();
DBStore spyDbStore = spy(dbStore);
AtomicReference<DBCheckpoint> realCheckpoint = new AtomicReference<>();
when(spyDbStore.getCheckpoint(true)).thenAnswer(b -> {
DBCheckpoint checkpoint = spy(dbStore.getCheckpoint(true));
// Don't delete the checkpoint, because we need to compare it
// with the snapshot data.
doNothing().when(checkpoint).cleanupCheckpoint();
realCheckpoint.set(checkpoint);
return checkpoint;
});
// Init the mock with the spyDbstore
doCallRealMethod().when(omDbCheckpointServletMock).initialize(any(), any(),
eq(false), any(), any(), eq(false));
omDbCheckpointServletMock.initialize(spyDbStore, om.getMetrics().getDBCheckpointMetrics(),
false,
om.getOmAdminUsernames(), om.getOmAdminGroups(), false);

setupClusterAndMocks(volumeName, bucketName, realCheckpoint);
DBStore dbStore = om.getMetadataManager().getStore();
// Get the tarball.
when(responseMock.getOutputStream()).thenReturn(servletOutputStream);
omDbCheckpointServletMock.doGet(requestMock, responseMock);
String testDirName = folder.resolve("testDir").toString();
String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME;
Expand Down Expand Up @@ -252,6 +248,8 @@ void testContentsOfTarballWithSnapshot() throws Exception {
populateInodesOfFilesInDirectory(dbStore, Paths.get(snapshotPath),
inodesFromOmDataDir, hardLinkMapFromOmData);
}
populateInodesOfFilesInDirectory(dbStore, Paths.get(dbStore.getRocksDBCheckpointDiffer().getSSTBackupDir()),
inodesFromOmDataDir, hardLinkMapFromOmData);
Path hardlinkFilePath =
newDbDir.toPath().resolve(OmSnapshotManager.OM_HARDLINK_FILE);
Map<String, List<String>> hardlinkMapFromTarball = readFileToMap(hardlinkFilePath.toString());
Expand All @@ -278,13 +276,150 @@ void testContentsOfTarballWithSnapshot() throws Exception {
assertFalse(hardlinkFilePath.toFile().exists());
}

/**
* Verifies that a manually added entry to the snapshot's delete table
* is persisted and can be retrieved from snapshot db loaded from OM DB checkpoint.
*/
@Test
public void testSnapshotDBConsistency() throws Exception {
String volumeName = "vol" + RandomStringUtils.secure().nextNumeric(5);
String bucketName = "buck" + RandomStringUtils.secure().nextNumeric(5);
AtomicReference<DBCheckpoint> realCheckpoint = new AtomicReference<>();
setupClusterAndMocks(volumeName, bucketName, realCheckpoint);
List<OzoneSnapshot> snapshots = new ArrayList<>();
client.getObjectStore().listSnapshot(volumeName, bucketName, "", null)
.forEachRemaining(snapshots::add);
OzoneSnapshot snapshotToModify = snapshots.get(0);
String dummyKey = "dummyKey";
writeDummyKeyToDeleteTableOfSnapshotDB(snapshotToModify, bucketName, volumeName, dummyKey);
// Get the tarball.
omDbCheckpointServletMock.doGet(requestMock, responseMock);
String testDirName = folder.resolve("testDir").toString();
String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME;
File newDbDir = new File(newDbDirName);
assertTrue(newDbDir.mkdirs());
FileUtil.unTar(tempFile, newDbDir);
Set<Path> allPathsInTarball = getAllPathsInTarball(newDbDir);
// create hardlinks now
OmSnapshotUtils.createHardLinks(newDbDir.toPath());
for (Path old : allPathsInTarball) {
assertTrue(old.toFile().delete());
}
Path snapshotDbDir = Paths.get(newDbDir.toPath().toString(), OM_SNAPSHOT_CHECKPOINT_DIR,
OM_DB_NAME + "-" + snapshotToModify.getSnapshotId());
deleteWalFiles(snapshotDbDir);
assertTrue(Files.exists(snapshotDbDir));
String value = getValueFromSnapshotDeleteTable(dummyKey, snapshotDbDir.toString());
assertNotNull(value);
}

private static void deleteWalFiles(Path snapshotDbDir) throws IOException {
try (Stream<Path> filesInTarball = Files.list(snapshotDbDir)) {
List<Path> files = filesInTarball.filter(p -> p.toString().contains(".log"))
.collect(Collectors.toList());
for (Path p : files) {
Files.delete(p);
}
}
}

private static Set<Path> getAllPathsInTarball(File newDbDir) throws IOException {
Set<Path> allPathsInTarball = new HashSet<>();
try (Stream<Path> filesInTarball = Files.list(newDbDir.toPath())) {
List<Path> files = filesInTarball.collect(Collectors.toList());
for (Path p : files) {
File file = p.toFile();
if (file.getName().equals(OmSnapshotManager.OM_HARDLINK_FILE)) {
continue;
}
allPathsInTarball.add(p);
}
}
return allPathsInTarball;
}

private void writeDummyKeyToDeleteTableOfSnapshotDB(OzoneSnapshot snapshotToModify, String bucketName,
String volumeName, String keyName)
throws IOException {
try (UncheckedAutoCloseableSupplier<OmSnapshot> supplier = om.getOmSnapshotManager()
.getSnapshot(snapshotToModify.getSnapshotId())) {
OmSnapshot omSnapshot = supplier.get();
OmKeyInfo dummyOmKeyInfo =
new OmKeyInfo.Builder().setBucketName(bucketName).setVolumeName(volumeName).setKeyName(keyName)
.setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE)).build();
RepeatedOmKeyInfo dummyRepeatedKeyInfo =
new RepeatedOmKeyInfo.Builder().setOmKeyInfos(Collections.singletonList(dummyOmKeyInfo)).build();
omSnapshot.getMetadataManager().getDeletedTable().put(dummyOmKeyInfo.getKeyName(), dummyRepeatedKeyInfo);
}
}

private void setupClusterAndMocks(String volumeName, String bucketName,
AtomicReference<DBCheckpoint> realCheckpoint) throws Exception {
setupCluster();
setupMocks();
om.getKeyManager().getSnapshotSstFilteringService().pause();
when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)).thenReturn("true");
// Create a "spy" dbstore keep track of the checkpoint.
writeData(volumeName, bucketName, true);
DBStore dbStore = om.getMetadataManager().getStore();
DBStore spyDbStore = spy(dbStore);
when(spyDbStore.getCheckpoint(true)).thenAnswer(b -> {
DBCheckpoint checkpoint = spy(dbStore.getCheckpoint(true));
// Don't delete the checkpoint, because we need to compare it
// with the snapshot data.
doNothing().when(checkpoint).cleanupCheckpoint();
realCheckpoint.set(checkpoint);
return checkpoint;
});
// Init the mock with the spyDbstore
doCallRealMethod().when(omDbCheckpointServletMock).initialize(any(), any(),
eq(false), any(), any(), eq(false));
omDbCheckpointServletMock.initialize(spyDbStore, om.getMetrics().getDBCheckpointMetrics(),
false,
om.getOmAdminUsernames(), om.getOmAdminGroups(), false);
when(responseMock.getOutputStream()).thenReturn(servletOutputStream);
}

String getValueFromSnapshotDeleteTable(String key, String snapshotDB) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String getValueFromSnapshotDeleteTable(String key, String snapshotDB) {
private String getValueFromSnapshotDeleteTable(String key, String snapshotDB) {

String result = null;
List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
int count = 1;
int deletedTableCFIndex = 0;
cfDescriptors.add(new ColumnFamilyDescriptor("default".getBytes(StandardCharsets.UTF_8)));
for (String cfName : OMDBDefinition.getAllColumnFamilies()) {
if (cfName.equals(OMDBDefinition.DELETED_TABLE)) {
deletedTableCFIndex = count;
}
cfDescriptors.add(new ColumnFamilyDescriptor(cfName.getBytes(StandardCharsets.UTF_8)));
count++;
}
// For holding handles
List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
try (DBOptions options = new DBOptions().setCreateIfMissing(false).setCreateMissingColumnFamilies(true);
RocksDB db = RocksDB.openReadOnly(options, snapshotDB, cfDescriptors, cfHandles)) {

ColumnFamilyHandle deletedTableCF = cfHandles.get(deletedTableCFIndex); // 0 is default
byte[] value = db.get(deletedTableCF, key.getBytes(StandardCharsets.UTF_8));
if (value != null) {
result = new String(value, StandardCharsets.UTF_8);
}
} catch (Exception e) {
fail("Exception while reading from snapshot DB " + e.getMessage());
} finally {
for (ColumnFamilyHandle handle : cfHandles) {
handle.close();
}
}
return result;
}

public static Map<String, List<String>> readFileToMap(String filePath) throws IOException {
Map<String, List<String>> dataMap = new HashMap<>();
try (BufferedReader reader = Files.newBufferedReader(Paths.get(filePath), StandardCharsets.UTF_8)) {
String line;
while ((line = reader.readLine()) != null) {
String trimmedLine = line.trim();
if (trimmedLine.isEmpty() || !trimmedLine.contains("\t")) {
if (!trimmedLine.contains("\t")) {
continue;
}
int tabIndex = trimmedLine.indexOf("\t");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_DB_LOCK;
import static org.apache.hadoop.ozone.om.snapshot.OMDBCheckpointUtils.includeSnapshotData;
import static org.apache.hadoop.ozone.om.snapshot.OMDBCheckpointUtils.logEstimatedTarballSize;
import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_PREFIX;
Expand All @@ -50,6 +51,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import javax.servlet.ServletException;
Expand Down Expand Up @@ -255,6 +257,9 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina
hardLinkFileMap, getCompactionLogDir());
writeDBToArchive(sstFilesToExclude, tmpSstBackupDir, maxTotalSstSize, archiveOutputStream, tmpdir,
hardLinkFileMap, getSstBackupDir());
// This is done to ensure all data to be copied correctly is flushed in the snapshot DB
transferSnapshotData(sstFilesToExclude, tmpdir, snapshotPaths, maxTotalSstSize,
archiveOutputStream, hardLinkFileMap);
}
writeHardlinkFile(getConf(), hardLinkFileMap, archiveOutputStream);
includeRatisSnapshotCompleteFlag(archiveOutputStream);
Expand All @@ -268,6 +273,36 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina
}
}

/**
* Transfers the snapshot data from the specified snapshot directories into the archive output stream,
* handling deduplication and managing resource locking.
*
* @param sstFilesToExclude Set of SST file identifiers to exclude from the archive.
* @param tmpdir Temporary directory for intermediate processing.
* @param snapshotPaths Set of paths to snapshot directories to be processed.
* @param maxTotalSstSize AtomicLong to track the cumulative size of SST files included.
* @param archiveOutputStream Archive output stream to write the snapshot data.
* @param hardLinkFileMap Map of hardlink file paths to their unique identifiers for deduplication.
* @throws IOException if an I/O error occurs during processing.
*/
private void transferSnapshotData(Set<String> sstFilesToExclude, Path tmpdir, Set<Path> snapshotPaths,
AtomicLong maxTotalSstSize, ArchiveOutputStream<TarArchiveEntry> archiveOutputStream,
Map<String, String> hardLinkFileMap) throws IOException {
OzoneManager om = (OzoneManager) getServletContext().getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE);
OMMetadataManager omMetadataManager = om.getMetadataManager();
for (Path snapshotDir : snapshotPaths) {
String snapshotId = OmSnapshotManager.extractSnapshotIDFromCheckpointDirName(snapshotDir.toString());
omMetadataManager.getLock().acquireReadLock(SNAPSHOT_DB_LOCK, snapshotId);
try {
// invalidate closes the snapshot DB
om.getOmSnapshotManager().invalidateCacheEntry(UUID.fromString(snapshotId));
writeDBToArchive(sstFilesToExclude, snapshotDir, maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap);
} finally {
omMetadataManager.getLock().releaseReadLock(SNAPSHOT_DB_LOCK, snapshotId);
}
}
}

private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dir,
AtomicLong maxTotalSstSize, ArchiveOutputStream<TarArchiveEntry> archiveOutputStream,
Path tmpdir, Map<String, String> hardLinkFileMap) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR;
import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIFF_DB_NAME;
import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_INDICATOR;
import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_SEPARATOR;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_LIMIT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_LIMIT_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL;
Expand Down Expand Up @@ -769,6 +770,15 @@ public static String getSnapshotPath(OzoneConfiguration conf,
OM_DB_NAME + checkpointDirName;
}

public static String extractSnapshotIDFromCheckpointDirName(String snapshotPath) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this public static method does not have test coverage, is only used by OMDBCheckpointServletInodeBasedXfer, and does not use any internal variables or method inside OmSnapshotManager. It should be moved into OMDBCheckpointServletInodeBasedXfer as a private method.

Otherwise if it is intended to be used as is later, please add javadoc and test coverage.

// Find "om.db-" in the path and return whatever comes after
int index = snapshotPath.lastIndexOf(OM_DB_NAME);
if (index == -1 || index + OM_DB_NAME.length() + OM_SNAPSHOT_SEPARATOR.length() >= snapshotPath.length()) {
throw new IllegalArgumentException("Invalid snapshot path " + snapshotPath);
}
return snapshotPath.substring(index + OM_DB_NAME.length() + OM_SNAPSHOT_SEPARATOR.length());
}

public static String getSnapshotLocalPropertyYamlPath(OzoneConfiguration conf,
SnapshotInfo snapshotInfo) {
return getSnapshotPath(conf, snapshotInfo) + ".yaml";
Expand Down
Loading