diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index cb4490c2c1d..b2f6ed8af60 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -75,8 +75,6 @@ public final class OzoneConsts { "/serviceList"; public static final String OZONE_DB_CHECKPOINT_HTTP_ENDPOINT = "/dbCheckpoint"; - public static final String OZONE_DB_CHECKPOINT_HTTP_ENDPOINT_V2 = - "/v2/dbCheckpoint"; // Ozone File System scheme public static final String OZONE_URI_SCHEME = "o3fs"; diff --git a/hadoop-hdds/docs/content/design/om-bootstrapping-with-snapshots.md b/hadoop-hdds/docs/content/design/om-bootstrapping-with-snapshots.md deleted file mode 100644 index b1d7cfa2f60..00000000000 --- a/hadoop-hdds/docs/content/design/om-bootstrapping-with-snapshots.md +++ /dev/null @@ -1,148 +0,0 @@ ---- -title: OM Bootstrapping with Snapshots -summary: Design for the OM bootstrapping process that is snapshot aware -date: 2025-08-05 -jira: HDDS-12090 -status: implemented -author: Swaminathan Balachandran ---- - - - -# OM Bootstrapping with Snapshots - -# Problem Statement: - -The current bootstrapping mechanism for OM has inconsistencies when dealing with Snapshotted OM RocksDBs. Bootstrapping occurs without locking mechanisms, and active transactions may still modify snapshots RocksDB during the process. This can lead to a corrupted RocksDB instance on the follower OM post-bootstrapping. To resolve this, the bootstrapping process must operate on a consistent system state. - -Jira Ticket : [https://issues.apache.org/jira/browse/HDDS-12090](https://issues.apache.org/jira/browse/HDDS-12090) - - - - - -# Background on Snapshots - -## Snapshot Operations - -When a snapshot is taken on an Ozone bucket, the following steps occur: - -1. A RocksDB checkpoint of the active `om.db` is created. -2. Deleted entries are removed from the `deletedKeyTable` and `deletedDirTable` in the Active Object Store (AOS) RocksDB. This is to just prevent the blocks from getting purged without checking for the key’s presence in the correct snapshot in the snapshot chain. -3. A new entry is added to the `snapshotInfoTable` in the AOS RocksDB. - -## Current Bootstrap Model: - -The current model involves the follower OM initiating an HTTP request to the leader OM, which provides a consistent view of its state. Before bucket snapshots were introduced, this process relied solely on an AOS RocksDB checkpoint. However, with snapshots, multiple RocksDB instances (AOS RocksDB \+ snapshot RocksDBs) must be handled, complicating the process. - -### Workflow: - -* Follower Initiation: - Sends an exclude list of files already copied in previous batches. -* Leader Actions: - * Creates an AOS RocksDB checkpoint. - * Performs a directory walk through: - * AOS RocksDB checkpoint directory. - * Snapshot RocksDB directories. - * Backup SST file directory (compaction backup directory). - * Identifies unique files to be copied in the next batch. - * Transfers files in batches, recreating hardlinks on the follower side as needed. - -### Issues with the Current Model - -1. Active transactions during bootstrapping may modify snapshot RocksDBs, leading to inconsistencies. -2. Partial data copies can occur when double-buffer flushes or other snapshot-related operations are in progress. -3. Large snapshot data sizes (often in GBs) require multi-batch transfers, increasing the risk of data corruption. - -# Proposed Fixes - -## Locking the Snapshot Cache - -Snapshot Cache is the class which is responsible for maintaining all rocksdb handles corresponding to a snapshot. The rocksdb handles are closed by the snapshot cache are closed from time to time if there are no references of the rocksdb being used by any of the threads in the system. Hence any operation on a snapshot would go through the snapshot cache increasing the reference count of that snapshot. Implementing a lock for this snapshot cache would prevent any newer threads from requesting a snapshot rocksdb handle from the snapshot cache. Thus any operation under this lock will have a consistent view of the entire snapshot. The only downside to this is that it would block the double buffer thread, hence any operation performed under this thread has to be lightweight so that it doesn’t end up running for a long period of time.(P.S. With Sumit’s implementation of optimized Gatekeeping model and getting rid of double buffer from OM would result in only blocking the snapshot operations which should be fine since these operations are only fired by background threads.) - -With the above implementation of a lock there is a way to get a consistent snapshot of the entire OM. Now lets dive into various approaches to overall bootstrap flow. - -## Approach 1(Batching files over multiple tarballs): - -This approach builds on the current model by introducing size thresholds to manage locks and data transfers more efficiently. - -### Workflow: - -1. Follower Initiation: - * Sends an exclude list of previously copied files (identified by `inodeId`). -2. Leader Directory Walk: - * Walks through AOS RocksDB, snapshot RocksDBs, and backup SST directories to identify files to transfer. - * Compares against the exclude list to avoid duplicate transfers. -3. If the total size of files to be copied is more than **ozone.om.ratis.snapshot.lock.max.total.size.threshold** then the files would be directly sent over the stream as a tarball where the name of the files is the inodeId of the file. -4. If the total size of files to be copied is less than equal to **ozone.om.ratis.snapshot.lock.max.total.size.threshold** then the snapshot cache lock is taken after waiting for the snapshot cache to completely get empty(No snapshot rocksdb should be open). Under the lock following operations would be performed: - * Take the AOS rocksdb checkpoint. - * A complete directory walk is done on AOS checkpoint rocksdb directory \+ all the snapshot rocksdb directories \+ backup sst file directory(compaction log directory) to figure out all the files to be copied and any file already present in the exclude list would be excluded. - * These files are added to the tarball where again the name of the file would be the inodeId of the file. -5. As the files are being iterated the path of each file and their corresponding inodeIds would be tracked. When it is the last batch this map would also be written as a text file in the final tarball to recreate all the hardlinks on the follower node. - -The only drawback with this approach is that we might end up sending more data over the network because some sst files sent over the network could have been replaced because of compaction running concurrently on the active object store. But at the same time since the entire bootstrap operation is supposed to finish in the order of a few minutes, the amount of extra data would be really minimal assuming we could utmost write 30 MBs of data assuming there are 30000 keys written in 2 mins each key would be around 1 KB. - -## Approach 1.1: - -This approach builds on the approach1 where along with introducing size thresholds under locks manage locks, we only rely on the number of files changed under the snapshot directory as the threshold. - -### Workflow: - -1. Follower Initiation: - * Sends an exclude list of previously copied files (identified by `inodeId`). -2. Leader Directory Walk: - * Walks through AOS RocksDB, snapshot RocksDBs, and backup SST directories to identify files to transfer. - * Compares against the exclude list to avoid duplicate transfers. -3. If either the total size to be copied or the total number of files under the snapshot rocksdb directory to be copied is more than **ozone.om.ratis.snapshot.max.total.sst.size** respectively then the files would be directly sent over the stream as a tarball where the name of the files is the inodeId of the file. -4. If the total number of file size to be copied under the snapshot rocksdb directory is less than equal to **ozone.om.ratis.snapshot.max.total.sst.size** then the snapshot cache lock is taken after waiting for the snapshot cache to completely get empty(No snapshot rocksdb should be open). Under the lock following operations would be performed: - * Take the AOS rocksdb checkpoint. - * A complete directory walk is done on all the snapshot rocksdb directories to figure out all the files to be copied and any file already present in the exclude list would be excluded. - * Hard links of these files are added to tmp directory on the leader. - * Exit lock - * After the lock all files under the tmp directory, AOS Rocksdb checkpoint directory and compaction backup directory have to be written to the tarball. As the files are being iterated the path of each file and their corresponding inodeIds would be tracked. Since this is the last batch this map would also be written as a text file in the final tarball to recreate all the hardlinks on the follower node. - -The drawbacks for this approach is the same as approach 1, but here we are optimizing on the amount of time lock is held by performing very lightweight operations under the lock. So this is a more optimal approach since it minimises the lock wait time on other threads. - -## Approach 2(Single tarball creation under lock): - -The approach 2 here proposes to create a single tarball file on the disk and stream the chunks of the tarball over multiple http batch request from the follower. -Following is the flow for creating the tarball: - -1. Snapshot cache lock is taken after waiting for the snapshot cache to become completely empty(No snapshot rocksdb should be open). Under the lock following operations would be performed: - * Take the AOS rocksdb checkpoint. - * A complete directory walk is done on AOS rocksdb directory \+ all the snapshot rocksdb directories \+ backup sst file directory(compaction log directory) to figure out all the files to be copied to create a single tarball. -2. This tarball should be streamed batch by batch to the follower in a paginated fashion. - -The drawback with this approach is that the double buffer would be blocked for a really long time if there is a lot of data to be tarballed. If the total snapshot size of the OM dbs put together is 1 TB, considering the tarball writes go to an NVMe and considering the write throughput for an NVMe drive is around 5 GB/sec then the tarball write might take a total of 1024/5 secs \= 3 mins. Blocking the double buffer thread for 3 mins seems to be a bad idea, but at the same time this would only happen if there is snapshot operation in flight or in the double buffer queue already. - -## Approach 3(Creating a checkpoint of the snapshot rocksdb under lock): - -The approach 3 here proposes to create a rocksdb checkpoint for each and every snapshot rocksdb in the system along with the AOS rocksdb under the snapshot cache lock. Outside of the lock we could either create a single tarball file as done in approach 2 or stream the files in batches as multiple tarball file similar to approach 1 to the follower. -Following is the flow for creating the tarball: - -1. Snapshot cache lock is taken after waiting for the snapshot cache to become completely empty(No snapshot rocksdb should be open). Under the lock following operations would be performed: - * Take the AOS rocksdb checkpoint. - * Take rocksdb checkpoint of each and every snapshot in the system by iterating through the snapshotInfo table of AOS checkpoint rocksdb. -2. Now the files in the checkpoint directories have to be streamed to the follower as done in either approach 1 or approach 2\. - -The drawback with this approach is that this would double the number of hardlinks in the file system which could have potential impact on performance during bootstrap, considering the case in systems where the total number of files and hardlinks in the system order up to 5 million files. - -## Recommendations - -Approach 1 is the most optimized solution as it balances the amount of time under the lock by minimising the amount of IO ops inside the lock by introducing another threshold config to track this. Moreover taking this approach will also need the most minimal amount of code change as it doesn’t differ from the current approach by much. While approach 2 might look simpler but this would imply revamping the entire bootstrap logic currently in place and moreover this approach might increase the total amount of time inside the lock which would imply blocking the double buffer thread of extended amounts of time if it comes to this situation, which approach 1 tries to avoid. - - -Final approach implemented is the Approach 1.1 - diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/Archiver.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/Archiver.java index eafe85853db..20bdc5d7629 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/Archiver.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/Archiver.java @@ -38,15 +38,12 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.ozone.OzoneConsts; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Create and extract archives. */ public final class Archiver { static final int MIN_BUFFER_SIZE = 8 * (int) OzoneConsts.KB; // same as IOUtils.DEFAULT_BUFFER_SIZE static final int MAX_BUFFER_SIZE = (int) OzoneConsts.MB; - private static final Logger LOG = LoggerFactory.getLogger(Archiver.class); private Archiver() { // no instances (for now) @@ -114,46 +111,6 @@ public static long includeFile(File file, String entryName, return bytes; } - /** - * Creates a hard link to the specified file in the provided temporary directory, - * adds the linked file as an entry to the archive with the given entry name, writes - * its contents to the archive output, and then deletes the temporary hard link. - *

- * This approach avoids altering the original file and works around limitations - * of certain archiving libraries that may require the source file to be present - * in a specific location or have a specific name. Any errors during the hardlink - * creation or archiving process are logged. - *

- * - * @param file the file to be included in the archive - * @param entryName the name/path under which the file should appear in the archive - * @param archiveOutput the output stream for the archive (e.g., tar) - * @param tmpDir the temporary directory in which to create the hard link - * @return number of bytes copied to the archive for this file - * @throws IOException if an I/O error occurs other than hardlink creation failure - */ - public static long linkAndIncludeFile(File file, String entryName, - ArchiveOutputStream archiveOutput, Path tmpDir) throws IOException { - File link = tmpDir.resolve(entryName).toFile(); - long bytes = 0; - try { - Files.createLink(link.toPath(), file.toPath()); - TarArchiveEntry entry = archiveOutput.createArchiveEntry(link, entryName); - archiveOutput.putArchiveEntry(entry); - try (InputStream input = Files.newInputStream(link.toPath())) { - bytes = IOUtils.copyLarge(input, archiveOutput); - } - archiveOutput.closeArchiveEntry(); - } catch (IOException ioe) { - LOG.error("Couldn't create hardlink for file {} while including it in tarball.", - file.getAbsolutePath(), ioe); - throw ioe; - } finally { - Files.deleteIfExists(link.toPath()); - } - return bytes; - } - public static void extractEntry(ArchiveEntry entry, InputStream input, long size, Path ancestor, Path path) throws IOException { HddsUtils.validatePath(path, ancestor); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java index dae2df9e5c3..629fba1772a 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java @@ -22,7 +22,6 @@ import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST; import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; -import com.google.common.annotations.VisibleForTesting; import java.io.File; import java.io.IOException; import java.io.OutputStream; @@ -123,10 +122,6 @@ public void initialize(DBStore store, DBCheckpointMetrics metrics, } } - public File getBootstrapTempData() { - return bootstrapTempData; - } - private boolean hasPermission(UserGroupInformation user) { // Check ACL for dbCheckpoint only when global Ozone ACL and SPNEGO is // enabled @@ -137,7 +132,7 @@ private boolean hasPermission(UserGroupInformation user) { } } - protected static void logSstFileList(Collection sstList, String msg, int sampleSize) { + private static void logSstFileList(Collection sstList, String msg, int sampleSize) { int count = sstList.size(); if (LOG.isDebugEnabled()) { LOG.debug(msg, count, "", sstList); @@ -204,8 +199,7 @@ private void generateSnapshotCheckpoint(HttpServletRequest request, processMetadataSnapshotRequest(request, response, isFormData, flush); } - @VisibleForTesting - public void processMetadataSnapshotRequest(HttpServletRequest request, HttpServletResponse response, + private void processMetadataSnapshotRequest(HttpServletRequest request, HttpServletResponse response, boolean isFormData, boolean flush) { List excludedSstList = new ArrayList<>(); String[] sstParam = isFormData ? @@ -278,22 +272,11 @@ public void processMetadataSnapshotRequest(HttpServletRequest request, HttpServl } } - protected static Set extractSstFilesToExclude(String[] filesInExclusionParam) { - Set sstFilesToExclude = new HashSet<>(); - if (filesInExclusionParam != null) { - sstFilesToExclude.addAll( - Arrays.stream(filesInExclusionParam).filter(s -> s.endsWith(ROCKSDB_SST_SUFFIX)) - .distinct().collect(Collectors.toList())); - logSstFileList(sstFilesToExclude, "Received list of {} SST files to be excluded{}: {}", 5); - } - return sstFilesToExclude; - } - - protected static Set extractFilesToExclude(String[] sstParam) { + protected static Set extractSstFilesToExclude(String[] sstParam) { Set receivedSstFiles = new HashSet<>(); if (sstParam != null) { receivedSstFiles.addAll( - Arrays.stream(sstParam).distinct().collect(Collectors.toList())); + Arrays.stream(sstParam).filter(s -> s.endsWith(ROCKSDB_SST_SUFFIX)).distinct().collect(Collectors.toList())); logSstFileList(receivedSstFiles, "Received list of {} SST files to be excluded{}: {}", 5); } return receivedSstFiles; @@ -309,7 +292,7 @@ public DBCheckpoint getCheckpoint(Path ignoredTmpdir, boolean flush) * @param request the HTTP servlet request * @return array of parsed sst form data parameters for exclusion */ - protected static String[] parseFormDataParameters(HttpServletRequest request) { + private static String[] parseFormDataParameters(HttpServletRequest request) { ServletFileUpload upload = new ServletFileUpload(); List sstParam = new ArrayList<>(); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java index 406736f5310..8b194e27b5d 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INFO_WAIT_DURATION_DEFAULT; import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath; import static org.apache.hadoop.ozone.OzoneConsts.DB_TRANSIENT_MARKER; +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY; import com.google.common.annotations.VisibleForTesting; @@ -318,24 +319,28 @@ public static File getMetaDir(DBDefinition definition, } /** - * Scan the DB dir and return the existing files, - * including omSnapshot files. + * Scan the DB dir and return the existing SST files, + * including omSnapshot sst files. + * SSTs could be used for avoiding repeated download. * * @param db the file representing the DB to be scanned - * @return the list of file names. If db not exist, will return empty list + * @return the list of SST file name. If db not exist, will return empty list */ - public static List getExistingFiles(File db) throws IOException { + public static List getExistingSstFiles(File db) throws IOException { List sstList = new ArrayList<>(); if (!db.exists()) { return sstList; } + + int truncateLength = db.toString().length() + 1; // Walk the db dir and get all sst files including omSnapshot files. try (Stream files = Files.walk(db.toPath())) { - sstList = files.filter(p -> p.toFile().isFile()) - .map(p -> p.getFileName().toString()). + sstList = + files.filter(path -> path.toString().endsWith(ROCKSDB_SST_SUFFIX)). + map(p -> p.toString().substring(truncateLength)). collect(Collectors.toList()); if (LOG.isDebugEnabled()) { - LOG.debug("Scanned files {} in {}.", sstList, db.getAbsolutePath()); + LOG.debug("Scanned SST files {} in {}.", sstList, db.getAbsolutePath()); } } return sstList; diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java index b40a1f84e15..777efcf47ea 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java @@ -108,15 +108,14 @@ public DBCheckpoint downloadDBSnapshotFromLeader(String leaderNodeID) LOG.info("Prepare to download the snapshot from leader OM {} and " + "reloading state from the snapshot.", leaderNodeID); checkLeaderConsistency(leaderNodeID); - int numParts = 0; while (true) { String snapshotFileName = getSnapshotFileName(leaderNodeID); File targetFile = new File(snapshotDir, snapshotFileName); downloadSnapshot(leaderNodeID, targetFile); - LOG.info("Successfully download the latest snapshot {} from leader OM: {}, part : {}", - targetFile, leaderNodeID, numParts); - numParts++; + LOG.info( + "Successfully download the latest snapshot {} from leader OM: {}", + targetFile, leaderNodeID); numDownloaded.incrementAndGet(); injectPause(); @@ -154,7 +153,7 @@ void checkLeaderConsistency(String currentLeader) throws IOException { return; } - List files = HAUtils.getExistingFiles(candidateDir); + List files = HAUtils.getExistingSstFiles(candidateDir); if (!files.isEmpty()) { LOG.warn("Candidate DB directory {} is not empty when last leader is " + "null.", candidateDir); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestArchiver.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestArchiver.java index 6c69f6fbaf5..e175f957355 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestArchiver.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestArchiver.java @@ -18,31 +18,9 @@ package org.apache.hadoop.hdds.utils; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.CALLS_REAL_METHODS; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.MockedStatic; /** Test {@link Archiver}. */ class TestArchiver { @@ -68,71 +46,4 @@ void bufferSizeAboveMaximum(long fileSize) { .isEqualTo(Archiver.MAX_BUFFER_SIZE); } - @Test - void testLinkAndIncludeFileSuccessfulHardLink() throws IOException { - Path tmpDir = Files.createTempDirectory("archiver-test"); - File tempFile = File.createTempFile("test-file", ".txt"); - String entryName = "test-entry"; - Files.write(tempFile.toPath(), "Test Content".getBytes(StandardCharsets.UTF_8)); - - TarArchiveOutputStream mockArchiveOutput = mock(TarArchiveOutputStream.class); - TarArchiveEntry mockEntry = new TarArchiveEntry(entryName); - AtomicBoolean isHardLinkCreated = new AtomicBoolean(false); - when(mockArchiveOutput.createArchiveEntry(any(File.class), eq(entryName))) - .thenAnswer(invocation -> { - File linkFile = invocation.getArgument(0); - isHardLinkCreated.set(Files.isSameFile(tempFile.toPath(), linkFile.toPath())); - return mockEntry; - }); - - // Call method under test - long bytesCopied = Archiver.linkAndIncludeFile(tempFile, entryName, mockArchiveOutput, tmpDir); - assertEquals(Files.size(tempFile.toPath()), bytesCopied); - // Verify archive interactions - verify(mockArchiveOutput, times(1)).putArchiveEntry(mockEntry); - verify(mockArchiveOutput, times(1)).closeArchiveEntry(); - assertTrue(isHardLinkCreated.get()); - assertFalse(Files.exists(tmpDir.resolve(entryName))); - // Cleanup - assertTrue(tempFile.delete()); - Files.deleteIfExists(tmpDir); - } - - @Test - void testLinkAndIncludeFileFailedHardLink() throws IOException { - Path tmpDir = Files.createTempDirectory("archiver-test"); - File tempFile = File.createTempFile("test-file", ".txt"); - String entryName = "test-entry"; - Files.write(tempFile.toPath(), "Test Content".getBytes(StandardCharsets.UTF_8)); - - TarArchiveOutputStream mockArchiveOutput = - mock(TarArchiveOutputStream.class); - TarArchiveEntry mockEntry = new TarArchiveEntry("test-entry"); - AtomicBoolean isHardLinkCreated = new AtomicBoolean(false); - when(mockArchiveOutput.createArchiveEntry(any(File.class), eq(entryName))) - .thenAnswer(invocation -> { - File linkFile = invocation.getArgument(0); - isHardLinkCreated.set(Files.isSameFile(tempFile.toPath(), linkFile.toPath())); - return mockEntry; - }); - - // Mock static Files.createLink to throw IOException - try (MockedStatic mockedFiles = mockStatic(Files.class, CALLS_REAL_METHODS)) { - Path linkPath = tmpDir.resolve(entryName); - String errorMessage = "Failed to create hardlink"; - mockedFiles.when(() -> Files.createLink(linkPath, tempFile.toPath())) - .thenThrow(new IOException(errorMessage)); - - IOException thrown = assertThrows(IOException.class, () -> - Archiver.linkAndIncludeFile(tempFile, entryName, mockArchiveOutput, tmpDir) - ); - - assertTrue(thrown.getMessage().contains(errorMessage)); - } - assertFalse(isHardLinkCreated.get()); - - assertTrue(tempFile.delete()); - Files.deleteIfExists(tmpDir); - } - } diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java index f8a1acf8739..7e141f1072d 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java @@ -115,11 +115,9 @@ public void downloadSnapshot(String leaderNodeID, File targetFile) concat(String.valueOf(a.length()))) .collect(Collectors.toList())); try (OutputStream outputStream = Files.newOutputStream(targetFile.toPath())) { - Set existingSstFiles = HAUtils.getExistingFiles(rdbSnapshotProvider.getCandidateDir()) - .stream() - .filter(fName -> fName.endsWith(".sst") && !fName.equals(".sst")) - .collect(Collectors.toSet()); - writeDBCheckpointToStream(dbCheckpoint, outputStream, existingSstFiles); + writeDBCheckpointToStream(dbCheckpoint, outputStream, + new HashSet<>(HAUtils.getExistingSstFiles( + rdbSnapshotProvider.getCandidateDir()))); } } }; @@ -142,7 +140,7 @@ public void testDownloadDBSnapshotFromLeader() throws Exception { assertTrue(candidateDir.exists()); DBCheckpoint checkpoint; - int before = HAUtils.getExistingFiles( + int before = HAUtils.getExistingSstFiles( rdbSnapshotProvider.getCandidateDir()).size(); assertEquals(0, before); @@ -150,12 +148,12 @@ public void testDownloadDBSnapshotFromLeader() throws Exception { checkpoint = rdbSnapshotProvider.downloadDBSnapshotFromLeader(LEADER_ID); File checkpointDir = checkpoint.getCheckpointLocation().toFile(); assertEquals(candidateDir, checkpointDir); - int first = HAUtils.getExistingFiles( + int first = HAUtils.getExistingSstFiles( rdbSnapshotProvider.getCandidateDir()).size(); // Get second snapshot checkpoint = rdbSnapshotProvider.downloadDBSnapshotFromLeader(LEADER_ID); - int second = HAUtils.getExistingFiles( + int second = HAUtils.getExistingSstFiles( rdbSnapshotProvider.getCandidateDir()).size(); assertThat(second).withFailMessage("The second snapshot should have more SST files") .isGreaterThan(first); @@ -165,7 +163,7 @@ public void testDownloadDBSnapshotFromLeader() throws Exception { // Get third snapshot checkpoint = rdbSnapshotProvider.downloadDBSnapshotFromLeader(LEADER_ID); - int third = HAUtils.getExistingFiles( + int third = HAUtils.getExistingSstFiles( rdbSnapshotProvider.getCandidateDir()).size(); assertThat(third).withFailMessage("The third snapshot should have more SST files") .isGreaterThan(second); @@ -174,7 +172,7 @@ public void testDownloadDBSnapshotFromLeader() throws Exception { // Test cleanup candidateDB rdbSnapshotProvider.init(); - assertEquals(0, HAUtils.getExistingFiles( + assertEquals(0, HAUtils.getExistingSstFiles( rdbSnapshotProvider.getCandidateDir()).size()); } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java index ff51cfa7adc..adcf07d2cb1 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java @@ -17,7 +17,7 @@ package org.apache.hadoop.ozone.om.helpers; -import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT_V2; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; @@ -189,7 +189,7 @@ public URL getOMDBCheckpointEndpointUrl(boolean isHttp, boolean flush) URIBuilder urlBuilder = new URIBuilder(). setScheme(isHttp ? "http" : "https"). setHost(isHttp ? getHttpAddress() : getHttpsAddress()). - setPath(OZONE_DB_CHECKPOINT_HTTP_ENDPOINT_V2). + setPath(OZONE_DB_CHECKPOINT_HTTP_ENDPOINT). addParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA, "true"). addParameter(OZONE_DB_CHECKPOINT_REQUEST_FLUSH, flush ? "true" : "false"); diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java index cbc3709ea1e..a8fdc8848bc 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java @@ -20,7 +20,6 @@ import static org.apache.hadoop.hdds.HddsUtils.fromProtobuf; import static org.apache.hadoop.hdds.HddsUtils.toProtobuf; import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; -import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_SEPARATOR; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -58,6 +57,7 @@ public final class SnapshotInfo implements Auditable, CopyObject { SnapshotInfo::getProtobuf, SnapshotInfo.class); + private static final String SEPARATOR = "-"; private static final long INVALID_TIMESTAMP = -1; private static final UUID INITIAL_SNAPSHOT_ID = UUID.randomUUID(); @@ -565,7 +565,7 @@ public Map toAuditMap() { public static String getCheckpointDirName(UUID snapshotId) { Objects.requireNonNull(snapshotId, "SnapshotId is needed to create checkpoint directory"); - return OM_SNAPSHOT_SEPARATOR + snapshotId; + return SEPARATOR + snapshotId; } /** diff --git a/hadoop-ozone/dist/src/main/compose/xcompat/lib.sh b/hadoop-ozone/dist/src/main/compose/xcompat/lib.sh index 80bcbd24e2a..5e53515b6f5 100755 --- a/hadoop-ozone/dist/src/main/compose/xcompat/lib.sh +++ b/hadoop-ozone/dist/src/main/compose/xcompat/lib.sh @@ -73,15 +73,6 @@ _read() { compatibility/read.robot } -_test_checkpoint_compatibility() { - _kinit - execute_robot_test ${container} -N "xcompat-cluster-${cluster_version}-client-${client_version}-checkpoint" \ - -v CLIENT_VERSION:${client_version} \ - -v CLUSTER_VERSION:${cluster_version} \ - -v TEST_DATA_DIR:/testdata \ - compatibility/checkpoint.robot -} - test_cross_compatibility() { echo "Starting ${cluster_version} cluster with COMPOSE_FILE=${COMPOSE_FILE}" @@ -116,35 +107,6 @@ test_cross_compatibility() { done done - # Add checkpoint compatibility tests (only for clusters that support checkpoint endpoints) - # Skip checkpoint tests for very old clusters that don't have the endpoints - if [[ "${cluster_version}" < "2.0.0" ]]; then - echo "Skipping checkpoint compatibility tests for cluster ${cluster_version} (checkpoint endpoints not available)" - else - echo "" - echo "==========================================" - echo "Running checkpoint compatibility tests" - echo "==========================================" - - # Test 2.0.0 client (if available) - for client_version in "$@"; do - if [[ "${client_version}" == "2.0.0" ]]; then - echo "Testing 2.0.0 client against ${cluster_version} cluster" - client _test_checkpoint_compatibility - break # Only test 2.0 once - fi - done - - # Test current client (if different from 2.0.0 and available) - for client_version in "$@"; do - if [[ "${client_version}" == "${current_version}" ]]; then - echo "Testing ${current_version} client against ${cluster_version} cluster" - client _test_checkpoint_compatibility - break # Only test current version once - fi - done - fi - KEEP_RUNNING=false stop_docker_env } diff --git a/hadoop-ozone/dist/src/main/smoketest/compatibility/checkpoint.robot b/hadoop-ozone/dist/src/main/smoketest/compatibility/checkpoint.robot deleted file mode 100644 index e1776ef1a4b..00000000000 --- a/hadoop-ozone/dist/src/main/smoketest/compatibility/checkpoint.robot +++ /dev/null @@ -1,110 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -*** Settings *** -Documentation Checkpoint Compatibility -Resource ../ozone-lib/shell.robot -Resource setup.robot -Test Timeout 5 minutes - -*** Variables *** -${CHECKPOINT_V2_VERSION} 2.1.0 -${OM_HOST} om -${OM_PORT} 9874 - -*** Keywords *** -Download Checkpoint V1 - [Documentation] Download checkpoint using v1 endpoint (/dbCheckpoint) - [Arguments] ${expected_result} - - Log Testing v1 checkpoint endpoint with authentication - - # Try different keytabs based on client version/container - ${download_file} = Set Variable /tmp/checkpoint_v1_${CLIENT_VERSION}.tar.gz - - # Debug: Check keytab availability first - ${keytab_check} = Execute ls -la /etc/security/keytabs/ 2>&1 | head -5 || echo "No keytabs directory" - Log Keytab directory: ${keytab_check} - - # Combine kinit and curl in a single command to preserve Kerberos session - ${combined_cmd} = Set Variable kinit -k -t /etc/security/keytabs/testuser.keytab testuser/scm@EXAMPLE.COM && curl -f --negotiate -u : --connect-timeout 10 --max-time 30 -o ${download_file} http://${OM_HOST}:${OM_PORT}/dbCheckpoint - - Log Executing: ${combined_cmd} - ${result} = Execute and checkrc ${combined_cmd} ${expected_result} - - IF ${expected_result} == 0 - # If we expect success, verify the file was created and has content - ${file_check} = Execute ls -la ${download_file} 2>/dev/null || echo "File not found" - Should Not Contain ${file_check} File not found - Should Contain ${file_check} checkpoint_v1_${CLIENT_VERSION}.tar.gz - Log Successfully downloaded checkpoint via v1 endpoint: ${file_check} - ELSE - Log v1 endpoint failed as expected for this version combination - END - -Download Checkpoint V2 - [Documentation] Download checkpoint using v2 endpoint (/dbCheckpointv2) - [Arguments] ${expected_result} - - Log Testing v2 checkpoint endpoint with authentication - - # Debug: Check keytab availability first (reuse from V1 if already checked) - ${keytab_check} = Execute ls -la /etc/security/keytabs/ 2>&1 | head -5 || echo "No keytabs directory" - Log Keytab directory: ${keytab_check} - - # Combine kinit and curl in a single command to preserve Kerberos session - ${download_file} = Set Variable /tmp/checkpoint_v2_${CLIENT_VERSION}.tar.gz - ${combined_cmd} = Set Variable kinit -k -t /etc/security/keytabs/testuser.keytab testuser/scm@EXAMPLE.COM && curl -f --negotiate -u : --connect-timeout 10 --max-time 30 -o ${download_file} http://${OM_HOST}:${OM_PORT}/v2/dbCheckpoint - - Log Executing: ${combined_cmd} - ${result} = Execute and checkrc ${combined_cmd} ${expected_result} - - IF ${expected_result} == 0 - # If we expect success, verify the file was created and has content - ${file_check} = Execute ls -la ${download_file} 2>/dev/null || echo "File not found" - Should Not Contain ${file_check} File not found - Should Contain ${file_check} checkpoint_v2_${CLIENT_VERSION}.tar.gz - Log Successfully downloaded checkpoint via v2 endpoint: ${file_check} - ELSE - Log v2 endpoint failed as expected for this version combination - END - -*** Test Cases *** -Checkpoint V1 Endpoint Compatibility - [Documentation] Test v1 checkpoint endpoint (/dbCheckpoint) - should work for all versions (backward compatibility) - - Log Testing v1 checkpoint endpoint: CLIENT=${CLIENT_VERSION}, CLUSTER=${CLUSTER_VERSION} - - # Both old and new clusters should serve v1 endpoint for backward compatibility - Download Checkpoint V1 0 - -Checkpoint V2 Endpoint Compatibility - [Documentation] Test v2 checkpoint endpoint (/v2/dbCheckpoint) - should only work with new cluster - - Log Testing v2 checkpoint endpoint: CLIENT=${CLIENT_VERSION}, CLUSTER=${CLUSTER_VERSION} - - IF '${CLUSTER_VERSION}' < '${CHECKPOINT_V2_VERSION}' - # Old cluster doesn't have v2 endpoint - should fail with any non-zero exit code - ${result} = Run Keyword And Return Status Download Checkpoint V2 0 - IF not ${result} - Log v2 endpoint correctly failed on old cluster ${CLUSTER_VERSION} (expected failure) - ELSE - Fail v2 endpoint unexpectedly succeeded on old cluster ${CLUSTER_VERSION} - END - ELSE - # New cluster has v2 endpoint - should succeed - Download Checkpoint V2 0 - Log v2 endpoint correctly succeeded on new cluster ${CLUSTER_VERSION} - END diff --git a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManagerHA.java b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManagerHA.java index 254a9f39256..790932f2568 100644 --- a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManagerHA.java +++ b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManagerHA.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.recon; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT_V2; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -119,7 +119,7 @@ public void testReconGetsSnapshotFromLeader() throws Exception { String expectedUrl = "http://" + (hostname.equals("0.0.0.0") ? "localhost" : hostname) + ":" + ozoneManager.get().getHttpServer().getHttpAddress().getPort() + - OZONE_DB_CHECKPOINT_HTTP_ENDPOINT_V2; + OZONE_DB_CHECKPOINT_HTTP_ENDPOINT; String snapshotUrl = impl.getOzoneManagerSnapshotUrl(); assertEquals(expectedUrl, snapshotUrl); // Write some data diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java index 4639b3cd697..3502c624c1f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java @@ -119,8 +119,6 @@ public void init() throws Exception { responseMock); doCallRealMethod().when(scmDbCheckpointServletMock).getCheckpoint(any(), anyBoolean()); - doCallRealMethod().when(scmDbCheckpointServletMock) - .processMetadataSnapshotRequest(any(), any(), anyBoolean(), anyBoolean()); ServletContext servletContextMock = mock(ServletContext.class); when(scmDbCheckpointServletMock.getServletContext()) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java index 3d542785e11..e470b4b47c5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java @@ -229,8 +229,6 @@ public void write(int b) throws IOException { doCallRealMethod().when(omDbCheckpointServletMock).getCheckpoint(any(), anyBoolean()); - doCallRealMethod().when(omDbCheckpointServletMock) - .processMetadataSnapshotRequest(any(), any(), anyBoolean(), anyBoolean()); } @Test diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java deleted file mode 100644 index 0f5c8bae4b4..00000000000 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java +++ /dev/null @@ -1,777 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.om; - -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD; -import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR; -import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME; -import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; -import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; -import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA; -import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH; -import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyBoolean; -import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiFunction; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import javax.servlet.ServletConfig; -import javax.servlet.ServletContext; -import javax.servlet.ServletOutputStream; -import javax.servlet.WriteListener; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; -import org.apache.commons.lang3.RandomStringUtils; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hdds.client.ReplicationConfig; -import org.apache.hadoop.hdds.client.ReplicationFactor; -import org.apache.hadoop.hdds.client.ReplicationType; -import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.utils.Archiver; -import org.apache.hadoop.hdds.utils.IOUtils; -import org.apache.hadoop.hdds.utils.db.DBCheckpoint; -import org.apache.hadoop.hdds.utils.db.DBStore; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.TestDataUtil; -import org.apache.hadoop.ozone.client.OzoneBucket; -import org.apache.hadoop.ozone.client.OzoneClient; -import org.apache.hadoop.ozone.client.OzoneSnapshot; -import org.apache.hadoop.ozone.lock.BootstrapStateHandler; -import org.apache.hadoop.ozone.om.codec.OMDBDefinition; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; -import org.apache.hadoop.ozone.om.service.DirectoryDeletingService; -import org.apache.hadoop.ozone.om.service.KeyDeletingService; -import org.apache.hadoop.ozone.om.service.SnapshotDeletingService; -import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; -import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.MockedStatic; -import org.rocksdb.ColumnFamilyDescriptor; -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.DBOptions; -import org.rocksdb.RocksDB; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Class used for testing the OM DB Checkpoint provider servlet using inode based transfer logic. - */ -public class TestOMDbCheckpointServletInodeBasedXfer { - - private MiniOzoneCluster cluster; - private OzoneClient client; - private OzoneManager om; - private OzoneConfiguration conf; - @TempDir - private Path folder; - private HttpServletRequest requestMock = null; - private HttpServletResponse responseMock = null; - private OMDBCheckpointServletInodeBasedXfer omDbCheckpointServletMock = null; - private ServletOutputStream servletOutputStream; - private File tempFile; - private static final AtomicInteger COUNTER = new AtomicInteger(); - private static final Logger LOG = - LoggerFactory.getLogger(TestOMDbCheckpointServletInodeBasedXfer.class); - - @BeforeEach - void init() throws Exception { - conf = new OzoneConfiguration(); - // ensure cache entries are not evicted thereby snapshot db's are not closed - conf.setTimeDuration(OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL, - 100, TimeUnit.MINUTES); - } - - @AfterEach - void shutdown() { - IOUtils.closeQuietly(client, cluster); - } - - private void setupCluster() throws Exception { - cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build(); - conf.setBoolean(OZONE_ACL_ENABLED, false); - conf.set(OZONE_ADMINISTRATORS, OZONE_ADMINISTRATORS_WILDCARD); - cluster.waitForClusterToBeReady(); - client = cluster.newClient(); - om = cluster.getOzoneManager(); - } - - private void setupMocks() throws Exception { - final Path tempPath = folder.resolve("temp" + COUNTER.incrementAndGet() + ".tar"); - tempFile = tempPath.toFile(); - - servletOutputStream = new ServletOutputStream() { - private final OutputStream fileOutputStream = Files.newOutputStream(tempPath); - - @Override - public boolean isReady() { - return true; - } - - @Override - public void setWriteListener(WriteListener writeListener) { - } - - @Override - public void close() throws IOException { - fileOutputStream.close(); - super.close(); - } - - @Override - public void write(int b) throws IOException { - fileOutputStream.write(b); - } - }; - - omDbCheckpointServletMock = mock(OMDBCheckpointServletInodeBasedXfer.class); - - BootstrapStateHandler.Lock lock = null; - if (om != null) { - lock = new OMDBCheckpointServlet.Lock(om); - } - doCallRealMethod().when(omDbCheckpointServletMock).init(); - assertNull(doCallRealMethod().when(omDbCheckpointServletMock).getDbStore()); - - requestMock = mock(HttpServletRequest.class); - // Return current user short name when asked - when(requestMock.getRemoteUser()) - .thenReturn(UserGroupInformation.getCurrentUser().getShortUserName()); - responseMock = mock(HttpServletResponse.class); - - ServletContext servletContextMock = mock(ServletContext.class); - when(omDbCheckpointServletMock.getServletContext()) - .thenReturn(servletContextMock); - - when(servletContextMock.getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE)) - .thenReturn(om); - when(requestMock.getParameter(OZONE_DB_CHECKPOINT_REQUEST_FLUSH)) - .thenReturn("true"); - - doCallRealMethod().when(omDbCheckpointServletMock).doGet(requestMock, - responseMock); - doCallRealMethod().when(omDbCheckpointServletMock).doPost(requestMock, - responseMock); - - doCallRealMethod().when(omDbCheckpointServletMock) - .writeDbDataToStream(any(), any(), any(), any(), any()); - doCallRealMethod().when(omDbCheckpointServletMock) - .writeDBToArchive(any(), any(), any(), any(), any(), any(), anyBoolean()); - - when(omDbCheckpointServletMock.getBootstrapStateLock()) - .thenReturn(lock); - doCallRealMethod().when(omDbCheckpointServletMock).getCheckpoint(any(), anyBoolean()); - assertNull(doCallRealMethod().when(omDbCheckpointServletMock).getBootstrapTempData()); - doCallRealMethod().when(omDbCheckpointServletMock).getSnapshotDirs(any()); - doCallRealMethod().when(omDbCheckpointServletMock). - processMetadataSnapshotRequest(any(), any(), anyBoolean(), anyBoolean()); - doCallRealMethod().when(omDbCheckpointServletMock).writeDbDataToStream(any(), any(), any(), any()); - doCallRealMethod().when(omDbCheckpointServletMock).getCompactionLogDir(); - doCallRealMethod().when(omDbCheckpointServletMock).getSstBackupDir(); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testTarballBatching(boolean includeSnapshots) throws Exception { - String volumeName = "vol" + RandomStringUtils.secure().nextNumeric(5); - String bucketName = "buck" + RandomStringUtils.secure().nextNumeric(5); - AtomicReference realCheckpoint = new AtomicReference<>(); - setupClusterAndMocks(volumeName, bucketName, realCheckpoint, includeSnapshots); - long maxFileSizeLimit = 4096; - om.getConfiguration().setLong(OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY, maxFileSizeLimit); - // Get the tarball. - omDbCheckpointServletMock.doGet(requestMock, responseMock); - String testDirName = folder.resolve("testDir").toString(); - String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME; - File newDbDir = new File(newDbDirName); - assertTrue(newDbDir.mkdirs()); - FileUtil.unTar(tempFile, newDbDir); - long totalSize; - try (Stream list = Files.list(newDbDir.toPath())) { - totalSize = list.mapToLong(path -> path.toFile().length()).sum(); - } - boolean obtainedFilesUnderMaxLimit = totalSize < maxFileSizeLimit; - if (!includeSnapshots) { - // If includeSnapshotData flag is set to false , it always sends all data - // in one batch and doesn't respect the max size config. This is how Recon - // uses it today. - assertFalse(obtainedFilesUnderMaxLimit); - } else { - assertTrue(obtainedFilesUnderMaxLimit); - } - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testContentsOfTarballWithSnapshot(boolean includeSnapshot) throws Exception { - String volumeName = "vol" + RandomStringUtils.secure().nextNumeric(5); - String bucketName = "buck" + RandomStringUtils.secure().nextNumeric(5); - AtomicReference realCheckpoint = new AtomicReference<>(); - setupClusterAndMocks(volumeName, bucketName, realCheckpoint, includeSnapshot); - DBStore dbStore = om.getMetadataManager().getStore(); - // Get the tarball. - omDbCheckpointServletMock.doGet(requestMock, responseMock); - String testDirName = folder.resolve("testDir").toString(); - String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME; - File newDbDir = new File(newDbDirName); - assertTrue(newDbDir.mkdirs()); - FileUtil.unTar(tempFile, newDbDir); - List snapshotPaths = new ArrayList<>(); - client.getObjectStore().listSnapshot(volumeName, bucketName, "", null) - .forEachRemaining(snapInfo -> snapshotPaths.add(getSnapshotDBPath(snapInfo.getCheckpointDir()))); - Set inodesFromOmDataDir = new HashSet<>(); - Set inodesFromTarball = new HashSet<>(); - try (Stream filesInTarball = Files.list(newDbDir.toPath())) { - List files = filesInTarball.collect(Collectors.toList()); - for (Path p : files) { - File file = p.toFile(); - if (file.getName().equals(OmSnapshotManager.OM_HARDLINK_FILE)) { - continue; - } - String inode = getInode(file.getName()); - inodesFromTarball.add(inode); - } - } - Map> hardLinkMapFromOmData = new HashMap<>(); - Path checkpointLocation = realCheckpoint.get().getCheckpointLocation(); - populateInodesOfFilesInDirectory(dbStore, checkpointLocation, - inodesFromOmDataDir, hardLinkMapFromOmData); - int numSnapshots = 0; - if (includeSnapshot) { - for (String snapshotPath : snapshotPaths) { - populateInodesOfFilesInDirectory(dbStore, Paths.get(snapshotPath), - inodesFromOmDataDir, hardLinkMapFromOmData); - numSnapshots++; - } - } - populateInodesOfFilesInDirectory(dbStore, Paths.get(dbStore.getRocksDBCheckpointDiffer().getSSTBackupDir()), - inodesFromOmDataDir, hardLinkMapFromOmData); - Path hardlinkFilePath = - newDbDir.toPath().resolve(OmSnapshotManager.OM_HARDLINK_FILE); - Map> hardlinkMapFromTarball = readFileToMap(hardlinkFilePath.toString()); - - // verify that all entries in hardLinkMapFromOmData are present in hardlinkMapFromTarball. - // entries in hardLinkMapFromOmData are from the snapshots + OM db checkpoint so they - // should be present in the tarball. - - for (Map.Entry> entry : hardLinkMapFromOmData.entrySet()) { - String key = entry.getKey(); - List value = entry.getValue(); - assertTrue(hardlinkMapFromTarball.containsKey(key)); - assertEquals(value, hardlinkMapFromTarball.get(key)); - } - // all files from the checkpoint should be in the tarball - assertFalse(inodesFromTarball.isEmpty()); - assertTrue(inodesFromTarball.containsAll(inodesFromOmDataDir)); - - long actualYamlFiles = Files.list(newDbDir.toPath()) - .filter(f -> f.getFileName().toString() - .endsWith(".yaml")).count(); - assertEquals(numSnapshots, actualYamlFiles, - "Number of generated YAML files should match the number of snapshots."); - - // create hardlinks now - OmSnapshotUtils.createHardLinks(newDbDir.toPath(), true); - - if (includeSnapshot) { - List yamlRelativePaths = snapshotPaths.stream().map(path -> { - int startIndex = path.indexOf("db.snapshots"); - if (startIndex != -1) { - return path.substring(startIndex) + ".yaml"; - } - return path + ".yaml"; - }).collect(Collectors.toList()); - - for (String yamlRelativePath : yamlRelativePaths) { - String yamlFileName = Paths.get(newDbDir.getPath(), yamlRelativePath).toString(); - assertTrue(Files.exists(Paths.get(yamlFileName))); - } - } - - assertFalse(hardlinkFilePath.toFile().exists()); - } - - /** - * Verifies that a manually added entry to the snapshot's delete table - * is persisted and can be retrieved from snapshot db loaded from OM DB checkpoint. - */ - @Test - public void testSnapshotDBConsistency() throws Exception { - String volumeName = "vol" + RandomStringUtils.secure().nextNumeric(5); - String bucketName = "buck" + RandomStringUtils.secure().nextNumeric(5); - AtomicReference realCheckpoint = new AtomicReference<>(); - setupClusterAndMocks(volumeName, bucketName, realCheckpoint, true); - List snapshots = new ArrayList<>(); - client.getObjectStore().listSnapshot(volumeName, bucketName, "", null) - .forEachRemaining(snapshots::add); - OzoneSnapshot snapshotToModify = snapshots.get(0); - String dummyKey = "dummyKey"; - writeDummyKeyToDeleteTableOfSnapshotDB(snapshotToModify, bucketName, volumeName, dummyKey); - // Get the tarball. - omDbCheckpointServletMock.doGet(requestMock, responseMock); - String testDirName = folder.resolve("testDir").toString(); - String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME; - File newDbDir = new File(newDbDirName); - assertTrue(newDbDir.mkdirs()); - FileUtil.unTar(tempFile, newDbDir); - Set allPathsInTarball = getAllPathsInTarball(newDbDir); - // create hardlinks now - OmSnapshotUtils.createHardLinks(newDbDir.toPath(), false); - for (Path old : allPathsInTarball) { - assertTrue(old.toFile().delete()); - } - Path snapshotDbDir = Paths.get(newDbDir.toPath().toString(), OM_SNAPSHOT_CHECKPOINT_DIR, - OM_DB_NAME + "-" + snapshotToModify.getSnapshotId()); - deleteWalFiles(snapshotDbDir); - assertTrue(Files.exists(snapshotDbDir)); - String value = getValueFromSnapshotDeleteTable(dummyKey, snapshotDbDir.toString()); - assertNotNull(value); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testWriteDBToArchive(boolean expectOnlySstFiles) throws Exception { - setupMocks(); - Path dbDir = folder.resolve("db_data"); - Files.createDirectories(dbDir); - // Create dummy files: one SST, one non-SST - Path sstFile = dbDir.resolve("test.sst"); - Files.write(sstFile, "sst content".getBytes(StandardCharsets.UTF_8)); // Write some content to make it non-empty - - Path nonSstFile = dbDir.resolve("test.log"); - Files.write(nonSstFile, "log content".getBytes(StandardCharsets.UTF_8)); - Set sstFilesToExclude = new HashSet<>(); - AtomicLong maxTotalSstSize = new AtomicLong(1000000); // Sufficient size - Map hardLinkFileMap = new java.util.HashMap<>(); - Path tmpDir = folder.resolve("tmp"); - Files.createDirectories(tmpDir); - TarArchiveOutputStream mockArchiveOutputStream = mock(TarArchiveOutputStream.class); - List fileNames = new ArrayList<>(); - try (MockedStatic archiverMock = mockStatic(Archiver.class)) { - archiverMock.when(() -> Archiver.linkAndIncludeFile(any(), any(), any(), any())).thenAnswer(invocation -> { - // Get the actual mockArchiveOutputStream passed from writeDBToArchive - TarArchiveOutputStream aos = invocation.getArgument(2); - File sourceFile = invocation.getArgument(0); - String fileId = invocation.getArgument(1); - fileNames.add(sourceFile.getName()); - aos.putArchiveEntry(new TarArchiveEntry(sourceFile, fileId)); - aos.write(new byte[100], 0, 100); // Simulate writing - aos.closeArchiveEntry(); - return 100L; - }); - boolean success = omDbCheckpointServletMock.writeDBToArchive( - sstFilesToExclude, dbDir, maxTotalSstSize, mockArchiveOutputStream, - tmpDir, hardLinkFileMap, expectOnlySstFiles); - assertTrue(success); - verify(mockArchiveOutputStream, times(fileNames.size())).putArchiveEntry(any()); - verify(mockArchiveOutputStream, times(fileNames.size())).closeArchiveEntry(); - verify(mockArchiveOutputStream, times(fileNames.size())).write(any(byte[].class), anyInt(), - anyInt()); // verify write was called once - - boolean containsNonSstFile = false; - for (String fileName : fileNames) { - if (expectOnlySstFiles) { - assertTrue(fileName.endsWith(".sst"), "File is not an SST File"); - } else { - containsNonSstFile = true; - } - } - - if (!expectOnlySstFiles) { - assertTrue(containsNonSstFile, "SST File is not expected"); - } - } - } - - @Test - public void testBootstrapLockCoordination() throws Exception { - // Create mocks for all background services - KeyDeletingService mockDeletingService = mock(KeyDeletingService.class); - DirectoryDeletingService mockDirDeletingService = mock(DirectoryDeletingService.class); - SstFilteringService mockFilteringService = mock(SstFilteringService.class); - SnapshotDeletingService mockSnapshotDeletingService = mock(SnapshotDeletingService.class); - RocksDBCheckpointDiffer mockCheckpointDiffer = mock(RocksDBCheckpointDiffer.class); - // Create mock locks for each service - BootstrapStateHandler.Lock mockDeletingLock = mock(BootstrapStateHandler.Lock.class); - BootstrapStateHandler.Lock mockDirDeletingLock = mock(BootstrapStateHandler.Lock.class); - BootstrapStateHandler.Lock mockFilteringLock = mock(BootstrapStateHandler.Lock.class); - BootstrapStateHandler.Lock mockSnapshotDeletingLock = mock(BootstrapStateHandler.Lock.class); - BootstrapStateHandler.Lock mockCheckpointDifferLock = mock(BootstrapStateHandler.Lock.class); - // Configure service mocks to return their respective locks - when(mockDeletingService.getBootstrapStateLock()).thenReturn(mockDeletingLock); - when(mockDirDeletingService.getBootstrapStateLock()).thenReturn(mockDirDeletingLock); - when(mockFilteringService.getBootstrapStateLock()).thenReturn(mockFilteringLock); - when(mockSnapshotDeletingService.getBootstrapStateLock()).thenReturn(mockSnapshotDeletingLock); - when(mockCheckpointDiffer.getBootstrapStateLock()).thenReturn(mockCheckpointDifferLock); - // Mock KeyManager and its services - KeyManager mockKeyManager = mock(KeyManager.class); - when(mockKeyManager.getDeletingService()).thenReturn(mockDeletingService); - when(mockKeyManager.getDirDeletingService()).thenReturn(mockDirDeletingService); - when(mockKeyManager.getSnapshotSstFilteringService()).thenReturn(mockFilteringService); - when(mockKeyManager.getSnapshotDeletingService()).thenReturn(mockSnapshotDeletingService); - // Mock OMMetadataManager and Store - OMMetadataManager mockMetadataManager = mock(OMMetadataManager.class); - DBStore mockStore = mock(DBStore.class); - when(mockMetadataManager.getStore()).thenReturn(mockStore); - when(mockStore.getRocksDBCheckpointDiffer()).thenReturn(mockCheckpointDiffer); - // Mock OzoneManager - OzoneManager mockOM = mock(OzoneManager.class); - when(mockOM.getKeyManager()).thenReturn(mockKeyManager); - when(mockOM.getMetadataManager()).thenReturn(mockMetadataManager); - // Create the actual Lock instance (this tests the real implementation) - OMDBCheckpointServlet.Lock bootstrapLock = new OMDBCheckpointServlet.Lock(mockOM); - // Test successful lock acquisition - BootstrapStateHandler.Lock result = bootstrapLock.lock(); - // Verify all service locks were acquired - verify(mockDeletingLock).lock(); - verify(mockDirDeletingLock).lock(); - verify(mockFilteringLock).lock(); - verify(mockSnapshotDeletingLock).lock(); - verify(mockCheckpointDifferLock).lock(); - // Verify double buffer flush was called - verify(mockOM).awaitDoubleBufferFlush(); - // Verify the lock returns itself - assertEquals(bootstrapLock, result); - // Test unlock - bootstrapLock.unlock(); - // Verify all service locks were released - verify(mockDeletingLock).unlock(); - verify(mockDirDeletingLock).unlock(); - verify(mockFilteringLock).unlock(); - verify(mockSnapshotDeletingLock).unlock(); - verify(mockCheckpointDifferLock).unlock(); - } - - /** - * Verifies that bootstrap lock acquisition blocks background services during checkpoint creation, - * preventing race conditions between checkpoint and service operations. - */ - @Test - public void testBootstrapLockBlocksMultipleServices() throws Exception { - setupCluster(); - // Initialize servlet - OMDBCheckpointServletInodeBasedXfer servlet = new OMDBCheckpointServletInodeBasedXfer(); - ServletConfig servletConfig = mock(ServletConfig.class); - ServletContext servletContext = mock(ServletContext.class); - when(servletConfig.getServletContext()).thenReturn(servletContext); - when(servletContext.getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE)).thenReturn(om); - servlet.init(servletConfig); - - BootstrapStateHandler.Lock bootstrapLock = servlet.getBootstrapStateLock(); - // Test multiple services being blocked - CountDownLatch bootstrapAcquired = new CountDownLatch(1); - CountDownLatch allServicesCompleted = new CountDownLatch(3); // 3 background services - AtomicInteger servicesBlocked = new AtomicInteger(0); - AtomicInteger servicesSucceeded = new AtomicInteger(0); - // Checkpoint thread holds bootstrap lock - Thread checkpointThread = new Thread(() -> { - try { - LOG.info("Acquiring bootstrap lock for checkpoint..."); - BootstrapStateHandler.Lock acquired = bootstrapLock.lock(); - bootstrapAcquired.countDown(); - Thread.sleep(3000); // Hold for 3 seconds - LOG.info("Releasing bootstrap lock..."); - acquired.unlock(); - } catch (Exception e) { - fail("Checkpoint failed: " + e.getMessage()); - } - }); - - BiFunction createServiceThread = - (serviceName, service) -> new Thread(() -> { - try { - bootstrapAcquired.await(); - if (service != null) { - LOG.info("{} : Trying to acquire lock...", serviceName); - servicesBlocked.incrementAndGet(); - BootstrapStateHandler.Lock serviceLock = service.getBootstrapStateLock(); - serviceLock.lock(); // Should block! - servicesBlocked.decrementAndGet(); - servicesSucceeded.incrementAndGet(); - LOG.info(" {} : Lock acquired!", serviceName); - serviceLock.unlock(); - } - allServicesCompleted.countDown(); - } catch (Exception e) { - LOG.error("{} failed", serviceName, e); - allServicesCompleted.countDown(); - } - }); - // Start all threads - checkpointThread.start(); - Thread keyDeletingThread = createServiceThread.apply("KeyDeletingService", - om.getKeyManager().getDeletingService()); - Thread dirDeletingThread = createServiceThread.apply("DirectoryDeletingService", - om.getKeyManager().getDirDeletingService()); - Thread snapshotDeletingThread = createServiceThread.apply("SnapshotDeletingService", - om.getKeyManager().getSnapshotDeletingService()); - keyDeletingThread.start(); - dirDeletingThread.start(); - snapshotDeletingThread.start(); - // Wait a bit, then verify multiple services are blocked - Thread.sleep(1000); - int blockedCount = servicesBlocked.get(); - assertTrue(blockedCount > 0, "At least one service should be blocked"); - assertEquals(0, servicesSucceeded.get(), "No services should have succeeded yet"); - // Wait for completion - assertTrue(allServicesCompleted.await(10, TimeUnit.SECONDS)); - // Verify all services eventually succeeded - assertEquals(0, servicesBlocked.get(), "No services should be blocked anymore"); - assertTrue(servicesSucceeded.get() > 0, "Services should have succeeded after lock release"); - } - - private static void deleteWalFiles(Path snapshotDbDir) throws IOException { - try (Stream filesInTarball = Files.list(snapshotDbDir)) { - List files = filesInTarball.filter(p -> p.toString().contains(".log")) - .collect(Collectors.toList()); - for (Path p : files) { - Files.delete(p); - } - } - } - - private static Set getAllPathsInTarball(File newDbDir) throws IOException { - Set allPathsInTarball = new HashSet<>(); - try (Stream filesInTarball = Files.list(newDbDir.toPath())) { - List files = filesInTarball.collect(Collectors.toList()); - for (Path p : files) { - File file = p.toFile(); - if (file.getName().equals(OmSnapshotManager.OM_HARDLINK_FILE)) { - continue; - } - allPathsInTarball.add(p); - } - } - return allPathsInTarball; - } - - private void writeDummyKeyToDeleteTableOfSnapshotDB(OzoneSnapshot snapshotToModify, String bucketName, - String volumeName, String keyName) - throws IOException { - try (UncheckedAutoCloseableSupplier supplier = om.getOmSnapshotManager() - .getSnapshot(snapshotToModify.getSnapshotId())) { - OmSnapshot omSnapshot = supplier.get(); - OmKeyInfo dummyOmKeyInfo = - new OmKeyInfo.Builder().setBucketName(bucketName).setVolumeName(volumeName).setKeyName(keyName) - .setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE)).build(); - RepeatedOmKeyInfo dummyRepeatedKeyInfo = - new RepeatedOmKeyInfo.Builder().setOmKeyInfos(Collections.singletonList(dummyOmKeyInfo)).build(); - omSnapshot.getMetadataManager().getDeletedTable().put(dummyOmKeyInfo.getKeyName(), dummyRepeatedKeyInfo); - } - } - - private void setupClusterAndMocks(String volumeName, String bucketName, - AtomicReference realCheckpoint, boolean includeSnapshots) throws Exception { - setupCluster(); - setupMocks(); - om.getKeyManager().getSnapshotSstFilteringService().pause(); - when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)) - .thenReturn(String.valueOf(includeSnapshots)); - // Create a "spy" dbstore keep track of the checkpoint. - writeData(volumeName, bucketName, true); - DBStore dbStore = om.getMetadataManager().getStore(); - DBStore spyDbStore = spy(dbStore); - when(spyDbStore.getCheckpoint(true)).thenAnswer(b -> { - DBCheckpoint checkpoint = spy(dbStore.getCheckpoint(true)); - // Don't delete the checkpoint, because we need to compare it - // with the snapshot data. - doNothing().when(checkpoint).cleanupCheckpoint(); - realCheckpoint.set(checkpoint); - return checkpoint; - }); - // Init the mock with the spyDbstore - doCallRealMethod().when(omDbCheckpointServletMock).initialize(any(), any(), - eq(false), any(), any(), eq(false)); - omDbCheckpointServletMock.initialize(spyDbStore, om.getMetrics().getDBCheckpointMetrics(), - false, - om.getOmAdminUsernames(), om.getOmAdminGroups(), false); - when(responseMock.getOutputStream()).thenReturn(servletOutputStream); - } - - String getValueFromSnapshotDeleteTable(String key, String snapshotDB) { - String result = null; - List cfDescriptors = new ArrayList<>(); - int count = 1; - int deletedTableCFIndex = 0; - cfDescriptors.add(new ColumnFamilyDescriptor("default".getBytes(StandardCharsets.UTF_8))); - for (String cfName : OMDBDefinition.getAllColumnFamilies()) { - if (cfName.equals(OMDBDefinition.DELETED_TABLE)) { - deletedTableCFIndex = count; - } - cfDescriptors.add(new ColumnFamilyDescriptor(cfName.getBytes(StandardCharsets.UTF_8))); - count++; - } - // For holding handles - List cfHandles = new ArrayList<>(); - try (DBOptions options = new DBOptions().setCreateIfMissing(false).setCreateMissingColumnFamilies(true); - RocksDB db = RocksDB.openReadOnly(options, snapshotDB, cfDescriptors, cfHandles)) { - - ColumnFamilyHandle deletedTableCF = cfHandles.get(deletedTableCFIndex); // 0 is default - byte[] value = db.get(deletedTableCF, key.getBytes(StandardCharsets.UTF_8)); - if (value != null) { - result = new String(value, StandardCharsets.UTF_8); - } - } catch (Exception e) { - fail("Exception while reading from snapshot DB " + e.getMessage()); - } finally { - for (ColumnFamilyHandle handle : cfHandles) { - handle.close(); - } - } - return result; - } - - public static Map> readFileToMap(String filePath) throws IOException { - Map> dataMap = new HashMap<>(); - try (BufferedReader reader = Files.newBufferedReader(Paths.get(filePath), StandardCharsets.UTF_8)) { - String line; - while ((line = reader.readLine()) != null) { - String trimmedLine = line.trim(); - if (!trimmedLine.contains("\t")) { - continue; - } - int tabIndex = trimmedLine.indexOf("\t"); - if (tabIndex > 0) { - // value is the full path that needs to be constructed - String value = trimmedLine.substring(0, tabIndex).trim(); - // key is the inodeID - String key = getInode(trimmedLine.substring(tabIndex + 1).trim()); - if (!key.isEmpty() && !value.isEmpty()) { - dataMap.computeIfAbsent(key, k -> new ArrayList<>()).add(value); - } - } - } - } - for (Map.Entry> entry : dataMap.entrySet()) { - Collections.sort(entry.getValue()); - } - return dataMap; - } - - private void populateInodesOfFilesInDirectory(DBStore dbStore, Path dbLocation, - Set inodesFromOmDbCheckpoint, Map> hardlinkMap) throws IOException { - try (Stream filesInOmDb = Files.list(dbLocation)) { - List files = filesInOmDb.collect(Collectors.toList()); - for (Path p : files) { - if (Files.isDirectory(p) || p.toFile().getName().equals(OmSnapshotManager.OM_HARDLINK_FILE)) { - continue; - } - String inode = getInode(OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(p)); - Path metadataDir = OMStorage.getOmDbDir(conf).toPath(); - String path = metadataDir.relativize(p).toString(); - if (path.contains(OM_CHECKPOINT_DIR)) { - path = metadataDir.relativize(dbStore.getDbLocation().toPath().resolve(p.getFileName())).toString(); - } - if (path.startsWith(OM_DB_NAME)) { - Path fileName = Paths.get(path).getFileName(); - // fileName will not be null, added null check for findbugs - if (fileName != null) { - path = fileName.toString(); - } - } - hardlinkMap.computeIfAbsent(inode, k -> new ArrayList<>()).add(path); - inodesFromOmDbCheckpoint.add(inode); - } - } - for (Map.Entry> entry : hardlinkMap.entrySet()) { - Collections.sort(entry.getValue()); - } - } - - private String getSnapshotDBPath(String checkPointDir) { - return OMStorage.getOmDbDir(cluster.getConf()) + - OM_KEY_PREFIX + OM_SNAPSHOT_CHECKPOINT_DIR + OM_KEY_PREFIX + - OM_DB_NAME + checkPointDir; - } - - private static String getInode(String inodeAndMtime) { - String inode = inodeAndMtime.split("-")[0]; - return inode; - } - - private void writeData(String volumeName, String bucketName, boolean includeSnapshots) throws Exception { - OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(client, volumeName, bucketName); - for (int i = 0; i < 10; i++) { - TestDataUtil.createKey(bucket, "key" + i, - ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.ONE), - "sample".getBytes(StandardCharsets.UTF_8)); - om.getMetadataManager().getStore().flushDB(); - } - if (includeSnapshots) { - TestDataUtil.createKey(bucket, "keysnap1", - ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.ONE), - "sample".getBytes(StandardCharsets.UTF_8)); - TestDataUtil.createKey(bucket, "keysnap2", - ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.ONE), - "sample".getBytes(StandardCharsets.UTF_8)); - client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot10"); - client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot20"); - } - } -} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java index a1de8fc377a..7e788819374 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java @@ -248,8 +248,8 @@ public void testInstallSnapshot(@TempDir Path tempDir) throws Exception { // Wait & for follower to update transactions to leader snapshot index. // Timeout error if follower does not load update within 10s GenericTestUtils.waitFor(() -> { - long index = followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex(); - return index >= leaderOMSnapshotIndex - 1; + return followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex() + >= leaderOMSnapshotIndex - 1; }, 100, 30_000); long followerOMLastAppliedIndex = @@ -569,7 +569,7 @@ private IncrementData getNextIncrementalTarball( Path increment = Paths.get(tempDir.toString(), "increment" + numKeys); assertTrue(increment.toFile().mkdirs()); unTarLatestTarBall(followerOM, increment); - List sstFiles = HAUtils.getExistingFiles(increment.toFile()); + List sstFiles = HAUtils.getExistingSstFiles(increment.toFile()); Path followerCandidatePath = followerOM.getOmSnapshotProvider(). getCandidateDir().toPath(); @@ -655,7 +655,7 @@ public void testInstallIncrementalSnapshotWithFailure() throws Exception { // Corrupt the mixed checkpoint in the candidate DB dir File followerCandidateDir = followerOM.getOmSnapshotProvider(). getCandidateDir(); - List sstList = HAUtils.getExistingFiles(followerCandidateDir); + List sstList = HAUtils.getExistingSstFiles(followerCandidateDir); assertThat(sstList.size()).isGreaterThan(0); for (int i = 0; i < sstList.size(); i += 2) { File victimSst = new File(followerCandidateDir, sstList.get(i)); @@ -1010,7 +1010,6 @@ public void testInstallCorruptedCheckpointFailure() throws Exception { DBCheckpoint leaderDbCheckpoint = leaderOM.getMetadataManager().getStore() .getCheckpoint(false); Path leaderCheckpointLocation = leaderDbCheckpoint.getCheckpointLocation(); - OmSnapshotUtils.createHardLinks(leaderCheckpointLocation, true); TransactionInfo leaderCheckpointTrxnInfo = OzoneManagerRatisUtils .getTrxnInfoFromCheckpoint(conf, leaderCheckpointLocation); @@ -1174,23 +1173,21 @@ public void pause() throws IOException { } else { // Each time we get a new tarball add a set of // its sst file to the list, (i.e. one per tarball.) - sstSetList.add(getFilenames(tarball)); + sstSetList.add(getSstFilenames(tarball)); } } // Get Size of sstfiles in tarball. private long getSizeOfSstFiles(File tarball) throws IOException { FileUtil.unTar(tarball, tempDir.toFile()); - OmSnapshotUtils.createHardLinks(tempDir, true); - List sstPaths = Files.list(tempDir).collect(Collectors.toList()); - long totalFileSize = 0; + List sstPaths = Files.walk(tempDir).filter( + path -> path.toString().endsWith(".sst")). + collect(Collectors.toList()); + long sstSize = 0; for (Path sstPath : sstPaths) { - File file = sstPath.toFile(); - if (file.isFile() && file.getName().endsWith(".sst")) { - totalFileSize += Files.size(sstPath); - } + sstSize += Files.size(sstPath); } - return totalFileSize; + return sstSize; } private void createEmptyTarball(File dummyTarFile) @@ -1201,18 +1198,21 @@ private void createEmptyTarball(File dummyTarFile) archiveOutputStream.close(); } - // Return a list of files in tarball. - private Set getFilenames(File tarball) + // Return a list of sst files in tarball. + private Set getSstFilenames(File tarball) throws IOException { - Set fileNames = new HashSet<>(); + Set sstFilenames = new HashSet<>(); try (TarArchiveInputStream tarInput = new TarArchiveInputStream(Files.newInputStream(tarball.toPath()))) { TarArchiveEntry entry; while ((entry = tarInput.getNextTarEntry()) != null) { - fileNames.add(entry.getName()); + String name = entry.getName(); + if (name.toLowerCase().endsWith(".sst")) { + sstFilenames.add(entry.getName()); + } } } - return fileNames; + return sstFilenames; } // Find the tarball in the dir. diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java index 4d5f1dda1c7..bdf1f7d1dab 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java @@ -124,8 +124,6 @@ public void testDownloadCheckpoint() throws Exception { private long getDownloadedSnapshotIndex(DBCheckpoint dbCheckpoint) throws Exception { - OmSnapshotUtils.createHardLinks(dbCheckpoint.getCheckpointLocation(), true); - TransactionInfo trxnInfoFromCheckpoint = OzoneManagerRatisUtils.getTrxnInfoFromCheckpoint(conf, dbCheckpoint.getCheckpointLocation()); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java deleted file mode 100644 index 28769f75409..00000000000 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java +++ /dev/null @@ -1,499 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.om; - -import static org.apache.hadoop.hdds.utils.Archiver.includeFile; -import static org.apache.hadoop.hdds.utils.Archiver.linkAndIncludeFile; -import static org.apache.hadoop.hdds.utils.Archiver.tar; -import static org.apache.hadoop.hdds.utils.HddsServerUtil.includeRatisSnapshotCompleteFlag; -import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR; -import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME; -import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST; -import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; -import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; -import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; -import static org.apache.hadoop.ozone.om.lock.FlatResource.SNAPSHOT_DB_LOCK; -import static org.apache.hadoop.ozone.om.snapshot.OMDBCheckpointUtils.includeSnapshotData; -import static org.apache.hadoop.ozone.om.snapshot.OMDBCheckpointUtils.logEstimatedTarballSize; -import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_PREFIX; -import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_SUFFIX; - -import com.google.common.annotations.VisibleForTesting; -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; -import java.time.Duration; -import java.time.Instant; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Stream; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import org.apache.commons.compress.archivers.ArchiveOutputStream; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.recon.ReconConfig; -import org.apache.hadoop.hdds.utils.DBCheckpointServlet; -import org.apache.hadoop.hdds.utils.db.DBCheckpoint; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.lock.BootstrapStateHandler; -import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; -import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager; -import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Time; -import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Specialized OMDBCheckpointServlet implementation that transfers Ozone Manager - * database checkpoints using inode-based deduplication. - *

- * This servlet constructs checkpoint archives by examining file inodes, - * ensuring that files with the same inode (i.e., hardlinks or duplicates) - * are only transferred once. It maintains mappings from inode IDs to file - * paths, manages hardlink information, and enforces snapshot and SST file - * size constraints as needed. - *

- * This approach optimizes checkpoint streaming by reducing redundant data - * transfer, especially in environments where RocksDB and snapshotting result - * in multiple hardlinks to the same physical data. - */ -public class OMDBCheckpointServletInodeBasedXfer extends DBCheckpointServlet { - - protected static final Logger LOG = - LoggerFactory.getLogger(OMDBCheckpointServletInodeBasedXfer.class); - private static final long serialVersionUID = 1L; - private transient BootstrapStateHandler.Lock lock; - - @Override - public void init() throws ServletException { - OzoneManager om = (OzoneManager) getServletContext() - .getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE); - - if (om == null) { - LOG.error("Unable to initialize OMDBCheckpointServlet. OM is null"); - return; - } - - OzoneConfiguration conf = getConf(); - // Only Ozone Admins and Recon are allowed - Collection allowedUsers = - new LinkedHashSet<>(om.getOmAdminUsernames()); - Collection allowedGroups = om.getOmAdminGroups(); - ReconConfig reconConfig = conf.getObject(ReconConfig.class); - String reconPrincipal = reconConfig.getKerberosPrincipal(); - if (!reconPrincipal.isEmpty()) { - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(reconPrincipal); - allowedUsers.add(ugi.getShortUserName()); - } - - initialize(om.getMetadataManager().getStore(), - om.getMetrics().getDBCheckpointMetrics(), - om.getAclsEnabled(), - allowedUsers, - allowedGroups, - om.isSpnegoEnabled()); - lock = new OMDBCheckpointServlet.Lock(om); - } - - @Override - public BootstrapStateHandler.Lock getBootstrapStateLock() { - return lock; - } - - @Override - public void processMetadataSnapshotRequest(HttpServletRequest request, HttpServletResponse response, - boolean isFormData, boolean flush) { - String[] sstParam = isFormData ? - parseFormDataParameters(request) : request.getParameterValues( - OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST); - Set receivedSstFiles = extractFilesToExclude(sstParam); - Path tmpdir = null; - try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) { - tmpdir = Files.createTempDirectory(getBootstrapTempData().toPath(), - "bootstrap-data-"); - if (tmpdir == null) { - throw new IOException("tmp dir is null"); - } - String tarName = "om.data-" + System.currentTimeMillis() + ".tar"; - response.setContentType("application/x-tar"); - response.setHeader("Content-Disposition", "attachment; filename=\"" + tarName + "\""); - Instant start = Instant.now(); - writeDbDataToStream(request, response.getOutputStream(), receivedSstFiles, tmpdir); - Instant end = Instant.now(); - long duration = Duration.between(start, end).toMillis(); - LOG.info("Time taken to write the checkpoint to response output " + - "stream: {} milliseconds", duration); - logSstFileList(receivedSstFiles, - "Excluded {} SST files from the latest checkpoint{}: {}", 5); - } catch (Exception e) { - LOG.error( - "Unable to process metadata snapshot request. ", e); - response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); - } finally { - try { - if (tmpdir != null) { - FileUtils.deleteDirectory(tmpdir.toFile()); - } - } catch (IOException e) { - LOG.error("unable to delete: " + tmpdir, e.toString()); - } - } - } - - Path getSstBackupDir() { - RocksDBCheckpointDiffer differ = getDbStore().getRocksDBCheckpointDiffer(); - return new File(differ.getSSTBackupDir()).toPath(); - } - - Path getCompactionLogDir() { - RocksDBCheckpointDiffer differ = getDbStore().getRocksDBCheckpointDiffer(); - return new File(differ.getCompactionLogDir()).toPath(); - } - - /** - * Streams the Ozone Manager database checkpoint and (optionally) snapshot-related data - * as a tar archive to the provided output stream. This method handles deduplication - * based on file inodes to avoid transferring duplicate files (such as hardlinks), - * supports excluding specific SST files, enforces maximum total SST file size limits, - * and manages temporary directories for processing. - * - * The method processes snapshot directories and backup/compaction logs (if requested), - * then finally the active OM database. It also writes a hardlink mapping file - * and includes a completion flag for Ratis snapshot streaming. - * - * @param request The HTTP servlet request containing parameters for the snapshot. - * @param destination The output stream to which the tar archive is written. - * @param sstFilesToExclude Set of SST file identifiers to exclude from the archive. - * @param tmpdir Temporary directory for staging files during archiving. - * @throws IOException if an I/O error occurs during processing or streaming. - */ - - public void writeDbDataToStream(HttpServletRequest request, OutputStream destination, - Set sstFilesToExclude, Path tmpdir) throws IOException { - DBCheckpoint checkpoint = null; - OzoneManager om = (OzoneManager) getServletContext().getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE); - OMMetadataManager omMetadataManager = om.getMetadataManager(); - boolean includeSnapshotData = includeSnapshotData(request); - AtomicLong maxTotalSstSize = new AtomicLong(getConf().getLong(OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY, - OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT)); - - Set snapshotPaths = Collections.emptySet(); - - if (!includeSnapshotData) { - maxTotalSstSize.set(Long.MAX_VALUE); - } else { - snapshotPaths = getSnapshotDirs(omMetadataManager); - } - - if (sstFilesToExclude.isEmpty()) { - logEstimatedTarballSize(getDbStore().getDbLocation().toPath(), snapshotPaths); - } - - boolean shouldContinue = true; - - Map hardLinkFileMap = new HashMap<>(); - try (ArchiveOutputStream archiveOutputStream = tar(destination)) { - if (includeSnapshotData) { - // Process each snapshot db path and write it to archive - for (Path snapshotDbPath : snapshotPaths) { - if (!shouldContinue) { - break; - } - shouldContinue = writeDBToArchive(sstFilesToExclude, snapshotDbPath, - maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, true); - } - - - if (shouldContinue) { - shouldContinue = writeDBToArchive(sstFilesToExclude, getSstBackupDir(), - maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, true); - } - - if (shouldContinue) { - shouldContinue = writeDBToArchive(sstFilesToExclude, getCompactionLogDir(), - maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, true); - } - } - - if (shouldContinue) { - // we finished transferring files from snapshot DB's by now and - // this is the last step where we transfer the active om.db contents - checkpoint = createAndPrepareCheckpoint(tmpdir, true); - // unlimited files as we want the Active DB contents to be transferred in a single batch - maxTotalSstSize.set(Long.MAX_VALUE); - Path checkpointDir = checkpoint.getCheckpointLocation(); - writeDBToArchive(sstFilesToExclude, checkpointDir, - maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, false); - if (includeSnapshotData) { - Path tmpCompactionLogDir = tmpdir.resolve(getCompactionLogDir().getFileName()); - Path tmpSstBackupDir = tmpdir.resolve(getSstBackupDir().getFileName()); - writeDBToArchive(sstFilesToExclude, tmpCompactionLogDir, maxTotalSstSize, archiveOutputStream, tmpdir, - hardLinkFileMap, getCompactionLogDir(), false); - writeDBToArchive(sstFilesToExclude, tmpSstBackupDir, maxTotalSstSize, archiveOutputStream, tmpdir, - hardLinkFileMap, getSstBackupDir(), false); - // This is done to ensure all data to be copied correctly is flushed in the snapshot DB - transferSnapshotData(sstFilesToExclude, tmpdir, snapshotPaths, maxTotalSstSize, - archiveOutputStream, hardLinkFileMap); - } - writeHardlinkFile(getConf(), hardLinkFileMap, archiveOutputStream); - includeRatisSnapshotCompleteFlag(archiveOutputStream); - } - - } catch (IOException ioe) { - LOG.error("got exception writing to archive " + ioe); - throw ioe; - } finally { - cleanupCheckpoint(checkpoint); - } - } - - /** - * Transfers the snapshot data from the specified snapshot directories into the archive output stream, - * handling deduplication and managing resource locking. - * - * @param sstFilesToExclude Set of SST file identifiers to exclude from the archive. - * @param tmpdir Temporary directory for intermediate processing. - * @param snapshotPaths Set of paths to snapshot directories to be processed. - * @param maxTotalSstSize AtomicLong to track the cumulative size of SST files included. - * @param archiveOutputStream Archive output stream to write the snapshot data. - * @param hardLinkFileMap Map of hardlink file paths to their unique identifiers for deduplication. - * @throws IOException if an I/O error occurs during processing. - */ - private void transferSnapshotData(Set sstFilesToExclude, Path tmpdir, Set snapshotPaths, - AtomicLong maxTotalSstSize, ArchiveOutputStream archiveOutputStream, - Map hardLinkFileMap) throws IOException { - OzoneManager om = (OzoneManager) getServletContext().getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE); - OMMetadataManager omMetadataManager = om.getMetadataManager(); - for (Path snapshotDir : snapshotPaths) { - String snapshotId = OmSnapshotManager.extractSnapshotIDFromCheckpointDirName(snapshotDir.toString()); - omMetadataManager.getLock().acquireReadLock(SNAPSHOT_DB_LOCK, snapshotId); - try { - // invalidate closes the snapshot DB - om.getOmSnapshotManager().invalidateCacheEntry(UUID.fromString(snapshotId)); - writeDBToArchive(sstFilesToExclude, snapshotDir, maxTotalSstSize, archiveOutputStream, tmpdir, - hardLinkFileMap, false); - Path snapshotLocalPropertyYaml = Paths.get( - OmSnapshotLocalDataManager.getSnapshotLocalPropertyYamlPath(snapshotDir)); - if (Files.exists(snapshotLocalPropertyYaml)) { - File yamlFile = snapshotLocalPropertyYaml.toFile(); - hardLinkFileMap.put(yamlFile.getAbsolutePath(), yamlFile.getName()); - linkAndIncludeFile(yamlFile, yamlFile.getName(), archiveOutputStream, tmpdir); - } - } finally { - omMetadataManager.getLock().releaseReadLock(SNAPSHOT_DB_LOCK, snapshotId); - } - } - } - - @VisibleForTesting - boolean writeDBToArchive(Set sstFilesToExclude, Path dir, - AtomicLong maxTotalSstSize, ArchiveOutputStream archiveOutputStream, - Path tmpdir, Map hardLinkFileMap, boolean onlySstFile) throws IOException { - return writeDBToArchive(sstFilesToExclude, dir, maxTotalSstSize, - archiveOutputStream, tmpdir, hardLinkFileMap, null, onlySstFile); - } - - private static void cleanupCheckpoint(DBCheckpoint checkpoint) { - if (checkpoint != null) { - try { - checkpoint.cleanupCheckpoint(); - } catch (IOException e) { - LOG.error("Error trying to clean checkpoint at {} .", - checkpoint.getCheckpointLocation().toString()); - } - } - } - - /** - * Writes a hardlink mapping file to the archive, which maps file IDs to their - * relative paths. This method generates the mapping file based on the provided - * hardlink metadata and adds it to the archive output stream. - * - * @param conf Ozone configuration for the OM instance. - * @param hardlinkFileMap A map where the key is the absolute file path - * and the value is its corresponding file ID. - * @param archiveOutputStream The archive output stream to which the hardlink - * file should be written. - * @throws IOException If an I/O error occurs while creating or writing the - * hardlink file. - */ - private static void writeHardlinkFile(OzoneConfiguration conf, Map hardlinkFileMap, - ArchiveOutputStream archiveOutputStream) throws IOException { - Path data = Files.createTempFile(DATA_PREFIX, DATA_SUFFIX); - Path metaDirPath = OMStorage.getOmDbDir(conf).toPath(); - StringBuilder sb = new StringBuilder(); - - for (Map.Entry entry : hardlinkFileMap.entrySet()) { - Path p = Paths.get(entry.getKey()); - String fileId = entry.getValue(); - Path relativePath = metaDirPath.relativize(p); - // if the file is in "om.db" directory, strip off the 'o - // m.db' name from the path - // and only keep the file name as this would be created in the current dir of the untarred dir - // on the follower. - if (relativePath.startsWith(OM_DB_NAME)) { - relativePath = relativePath.getFileName(); - } - sb.append(relativePath).append('\t').append(fileId).append('\n'); - } - Files.write(data, sb.toString().getBytes(StandardCharsets.UTF_8), StandardOpenOption.TRUNCATE_EXISTING); - includeFile(data.toFile(), OmSnapshotManager.OM_HARDLINK_FILE, archiveOutputStream); - } - - /** - * Gets the configuration from the OzoneManager context. - * - * @return OzoneConfiguration instance - */ - private OzoneConfiguration getConf() { - return ((OzoneManager) getServletContext() - .getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE)) - .getConfiguration(); - } - - /** - * Collects paths to all snapshot databases. - * - * @param omMetadataManager OMMetadataManager instance - * @return Set of paths to snapshot databases - * @throws IOException if an I/O error occurs - */ - Set getSnapshotDirs(OMMetadataManager omMetadataManager) throws IOException { - Set snapshotPaths = new HashSet<>(); - SnapshotChainManager snapshotChainManager = new SnapshotChainManager(omMetadataManager); - for (SnapshotChainInfo snapInfo : snapshotChainManager.getGlobalSnapshotChain().values()) { - String snapshotDir = - OmSnapshotManager.getSnapshotPath(getConf(), SnapshotInfo.getCheckpointDirName(snapInfo.getSnapshotId())); - Path path = Paths.get(snapshotDir); - snapshotPaths.add(path); - } - return snapshotPaths; - } - - /** - * Writes database files to the archive, handling deduplication based on inode IDs. - * Here the dbDir could either be a snapshot db directory, the active om.db, - * compaction log dir, sst backup dir. - * - * @param sstFilesToExclude Set of SST file IDs to exclude from the archive - * @param dbDir Directory containing database files to archive - * @param maxTotalSstSize Maximum total size of SST files to include - * @param archiveOutputStream Archive output stream - * @param tmpDir Temporary directory for processing - * @param hardLinkFileMap Map of hardlink file paths to their unique identifiers for deduplication - * @param destDir Destination directory for the archived files. If null, - * the archived files are not moved to this directory. - * @param onlySstFile If true, only SST files are processed. If false, all files are processed. - *

- * This parameter is typically set to {@code true} for initial iterations to - * prioritize SST file transfer, and then set to {@code false} only for the - * final iteration to ensure all remaining file types are transferred. - * @return true if processing should continue, false if size limit reached - * @throws IOException if an I/O error occurs - */ - @SuppressWarnings("checkstyle:ParameterNumber") - private boolean writeDBToArchive(Set sstFilesToExclude, Path dbDir, AtomicLong maxTotalSstSize, - ArchiveOutputStream archiveOutputStream, Path tmpDir, - Map hardLinkFileMap, Path destDir, boolean onlySstFile) throws IOException { - if (!Files.exists(dbDir)) { - LOG.warn("DB directory {} does not exist. Skipping.", dbDir); - return true; - } - long bytesWritten = 0L; - int filesWritten = 0; - long lastLoggedTime = Time.monotonicNow(); - try (Stream files = Files.list(dbDir)) { - Iterable iterable = files::iterator; - for (Path dbFile : iterable) { - if (!Files.isDirectory(dbFile)) { - if (onlySstFile && !dbFile.toString().endsWith(ROCKSDB_SST_SUFFIX)) { - continue; - } - String fileId = OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(dbFile); - String path = dbFile.toFile().getAbsolutePath(); - if (destDir != null) { - path = destDir.resolve(dbFile.getFileName()).toString(); - } - // if the file is in the om checkpoint dir, then we need to change the path to point to the OM DB. - if (path.contains(OM_CHECKPOINT_DIR)) { - path = getDbStore().getDbLocation().toPath().resolve(dbFile.getFileName()).toAbsolutePath().toString(); - } - hardLinkFileMap.put(path, fileId); - if (!sstFilesToExclude.contains(fileId)) { - long fileSize = Files.size(dbFile); - if (maxTotalSstSize.get() - fileSize <= 0) { - return false; - } - bytesWritten += linkAndIncludeFile(dbFile.toFile(), fileId, archiveOutputStream, tmpDir); - filesWritten++; - maxTotalSstSize.addAndGet(-fileSize); - sstFilesToExclude.add(fileId); - if (Time.monotonicNow() - lastLoggedTime >= 30000) { - LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...", - bytesWritten / (1024), filesWritten); - lastLoggedTime = Time.monotonicNow(); - } - } - } - } - } - return true; - } - - /** - * Creates a database checkpoint and copies compaction log and SST backup files - * into the given temporary directory. - * The copy to the temporary directory for compaction log and SST backup files - * is done to maintain a consistent view of the files in these directories. - * - * @param tmpdir Temporary directory for storing checkpoint-related files. - * @param flush If true, flushes in-memory data to disk before checkpointing. - * @return The created database checkpoint. - * @throws IOException If an error occurs during checkpoint creation or file copying. - */ - private DBCheckpoint createAndPrepareCheckpoint(Path tmpdir, boolean flush) throws IOException { - // make tmp directories to contain the copies - Path tmpCompactionLogDir = tmpdir.resolve(getCompactionLogDir().getFileName()); - Path tmpSstBackupDir = tmpdir.resolve(getSstBackupDir().getFileName()); - - // Create checkpoint and then copy the files so that it has all the compaction entries and files. - DBCheckpoint dbCheckpoint = getDbStore().getCheckpoint(flush); - FileUtils.copyDirectory(getCompactionLogDir().toFile(), tmpCompactionLogDir.toFile()); - OmSnapshotUtils.linkFiles(getSstBackupDir().toFile(), tmpSstBackupDir.toFile()); - - return dbCheckpoint; - } -} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java index 7b9beb80cf6..eae4dd6b224 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java @@ -26,7 +26,6 @@ import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIFF_DB_NAME; import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_INDICATOR; -import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_SEPARATOR; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_LIMIT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_LIMIT_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL; @@ -811,24 +810,10 @@ public static Path getSnapshotPath(OMMetadataManager omMetadataManager, UUID sna } public static String getSnapshotPath(OzoneConfiguration conf, - SnapshotInfo snapshotInfo) { - return getSnapshotPath(conf, snapshotInfo.getCheckpointDirName()); - } - - public static String getSnapshotPath(OzoneConfiguration conf, - String checkpointDirName) { + SnapshotInfo snapshotInfo) { return OMStorage.getOmDbDir(conf) + OM_KEY_PREFIX + OM_SNAPSHOT_CHECKPOINT_DIR + OM_KEY_PREFIX + - OM_DB_NAME + checkpointDirName; - } - - public static String extractSnapshotIDFromCheckpointDirName(String snapshotPath) { - // Find "om.db-" in the path and return whatever comes after - int index = snapshotPath.lastIndexOf(OM_DB_NAME); - if (index == -1 || index + OM_DB_NAME.length() + OM_SNAPSHOT_SEPARATOR.length() >= snapshotPath.length()) { - throw new IllegalArgumentException("Invalid snapshot path " + snapshotPath); - } - return snapshotPath.substring(index + OM_DB_NAME.length() + OM_SNAPSHOT_SEPARATOR.length()); + OM_DB_NAME + snapshotInfo.getCheckpointDirName(); } public static boolean isSnapshotKey(String[] keyParts) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 56e51cf4026..a4e384890de 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -3988,7 +3988,7 @@ public synchronized TermIndex installSnapshotFromLeader(String leaderId) { TermIndex termIndex = null; try { // Install hard links. - OmSnapshotUtils.createHardLinks(omDBCheckpoint.getCheckpointLocation(), false); + OmSnapshotUtils.createHardLinks(omDBCheckpoint.getCheckpointLocation()); termIndex = installCheckpoint(leaderId, omDBCheckpoint); } catch (Exception ex) { LOG.error("Failed to install snapshot from Leader OM.", ex); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java index 9c2688de812..355d6249806 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java @@ -18,7 +18,6 @@ package org.apache.hadoop.ozone.om; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT; -import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT_V2; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_SERVICE_LIST_HTTP_ENDPOINT; import java.io.IOException; @@ -38,8 +37,6 @@ public OzoneManagerHttpServer(MutableConfigurationSource conf, ServiceListJSONServlet.class); addServlet("dbCheckpoint", OZONE_DB_CHECKPOINT_HTTP_ENDPOINT, OMDBCheckpointServlet.class); - addServlet("dbCheckpointv2", OZONE_DB_CHECKPOINT_HTTP_ENDPOINT_V2, - OMDBCheckpointServletInodeBasedXfer.class); getWebAppContext().setAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE, om); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java index 9894e8f5d6b..6d053e1e5e0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java @@ -17,8 +17,6 @@ package org.apache.hadoop.ozone.om.codec; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition; @@ -360,13 +358,5 @@ public String getName() { public String getLocationConfigKey() { return OMConfigKeys.OZONE_OM_DB_DIRS; } - - public static List getAllColumnFamilies() { - List columnFamilies = new ArrayList<>(); - COLUMN_FAMILIES.values().forEach(cf -> { - columnFamilies.add(cf.getName()); - }); - return columnFamilies; - } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java index ef0a46548a4..a343232c39e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java @@ -153,7 +153,7 @@ public void downloadSnapshot(String leaderNodeID, File targetFile) connection.setRequestProperty("Content-Type", contentTypeValue); connection.setDoOutput(true); writeFormData(connection, - HAUtils.getExistingFiles(getCandidateDir())); + HAUtils.getExistingSstFiles(getCandidateDir())); connection.connect(); int errorCode = connection.getResponseCode(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java index 497c7a064b8..f5805044b7f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java @@ -27,15 +27,11 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.attribute.BasicFileAttributes; -import java.nio.file.attribute.FileTime; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.ozone.om.OmSnapshotManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Ozone Manager Snapshot Utilities. @@ -44,8 +40,6 @@ public final class OmSnapshotUtils { public static final String DATA_PREFIX = "data"; public static final String DATA_SUFFIX = "txt"; - private static final Logger LOG = - LoggerFactory.getLogger(OmSnapshotUtils.class); private OmSnapshotUtils() { } @@ -71,26 +65,6 @@ public static Object getINode(Path file) throws IOException { return Files.readAttributes(file, BasicFileAttributes.class).fileKey(); } - /** - * Returns a string combining the inode (fileKey) and the last modification time (mtime) of the given file. - *

- * The returned string is formatted as "{inode}-{mtime}", where: - *

    - *
  • {@code inode} is the unique file key obtained from the file system, typically representing - * the inode on POSIX systems
  • - *
  • {@code mtime} is the last modified time of the file in milliseconds since the epoch
  • - *
- * - * @param file the {@link Path} to the file whose inode and modification time are to be retrieved - * @return a string in the format "{inode}-{mtime}" - * @throws IOException if an I/O error occurs - */ - public static String getFileInodeAndLastModifiedTimeString(Path file) throws IOException { - Object inode = Files.readAttributes(file, BasicFileAttributes.class).fileKey(); - FileTime mTime = Files.getLastModifiedTime(file); - return String.format("%s-%s", inode, mTime.toMillis()); - } - /** * Create file of links to add to tarball. * Format of entries are either: @@ -129,12 +103,10 @@ public static Path createHardLinkList(int truncateLength, * Create hard links listed in OM_HARDLINK_FILE. * * @param dbPath Path to db to have links created. - * @param deleteSourceFiles - Whether to delete the source files after creating the links. */ - public static void createHardLinks(Path dbPath, boolean deleteSourceFiles) throws IOException { + public static void createHardLinks(Path dbPath) throws IOException { File hardLinkFile = new File(dbPath.toString(), OmSnapshotManager.OM_HARDLINK_FILE); - List filesToDelete = new ArrayList<>(); if (hardLinkFile.exists()) { // Read file. try (Stream s = Files.lines(hardLinkFile.toPath())) { @@ -142,15 +114,9 @@ public static void createHardLinks(Path dbPath, boolean deleteSourceFiles) throw // Create a link for each line. for (String l : lines) { - String[] parts = l.split("\t"); - if (parts.length != 2) { - LOG.warn("Skipping malformed line in hardlink file: {}", l); - continue; - } - String from = parts[1]; - String to = parts[0]; + String from = l.split("\t")[1]; + String to = l.split("\t")[0]; Path fullFromPath = Paths.get(dbPath.toString(), from); - filesToDelete.add(fullFromPath); Path fullToPath = Paths.get(dbPath.toString(), to); // Make parent dir if it doesn't exist. Path parent = fullToPath.getParent(); @@ -167,15 +133,6 @@ public static void createHardLinks(Path dbPath, boolean deleteSourceFiles) throw } } } - if (deleteSourceFiles) { - for (Path fileToDelete : filesToDelete) { - try { - Files.deleteIfExists(fileToDelete); - } catch (IOException e) { - LOG.warn("Couldn't delete source file {} while unpacking the DB", fileToDelete, e); - } - } - } } /** diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java index 6867f819b9c..81c9dc46554 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java @@ -151,7 +151,6 @@ public void invalidate(UUID key) { LOG.debug("SnapshotId: '{}' does not exist in snapshot cache.", k); } else { try { - v.get().getMetadataManager().getStore().flushDB(); v.get().close(); } catch (IOException e) { throw new IllegalStateException("Failed to close snapshotId: " + key, e); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java index 7f808df3f97..03c3b4e4ef1 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java @@ -19,7 +19,7 @@ import static org.apache.commons.io.file.PathUtils.copyDirectory; import static org.apache.hadoop.hdds.StringUtils.string2Bytes; -import static org.apache.hadoop.hdds.utils.HAUtils.getExistingFiles; +import static org.apache.hadoop.hdds.utils.HAUtils.getExistingSstFiles; import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR; import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME; import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; @@ -35,6 +35,7 @@ import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.VOLUME_TABLE; import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.getINode; +import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.truncateFileName; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -456,7 +457,7 @@ public void testHardLinkCreation() throws IOException { File s1FileLink = new File(followerSnapDir2, "s1.sst"); // Create links on the follower from list. - OmSnapshotUtils.createHardLinks(candidateDir.toPath(), false); + OmSnapshotUtils.createHardLinks(candidateDir.toPath()); // Confirm expected follower links. assertTrue(s1FileLink.exists()); @@ -499,16 +500,44 @@ public void testGetSnapshotInfo() throws IOException { @Test public void testExcludeUtilities() throws IOException { File noLinkFile = new File(followerSnapDir2, "noLink.sst"); - File nonSstFile = new File(followerSnapDir2, "nonSstFile"); + // Confirm that the list of existing sst files is as expected. - List existingSstList = getExistingFiles(candidateDir); + List existingSstList = getExistingSstFiles(candidateDir); Set existingSstFiles = new HashSet<>(existingSstList); - Set expectedSstFileNames = new HashSet<>(Arrays.asList( - s1File.getName(), - noLinkFile.getName(), - f1File.getName(), - nonSstFile.getName())); - assertEquals(expectedSstFileNames, existingSstFiles); + int truncateLength = candidateDir.toString().length() + 1; + Set expectedSstFiles = new HashSet<>(Arrays.asList( + s1File.toString().substring(truncateLength), + noLinkFile.toString().substring(truncateLength), + f1File.toString().substring(truncateLength))); + assertEquals(expectedSstFiles, existingSstFiles); + + // Confirm that the excluded list is normalized as expected. + // (Normalizing means matches the layout on the leader.) + File leaderSstBackupDir = new File(leaderDir.toString(), "sstBackup"); + assertTrue(leaderSstBackupDir.mkdirs()); + File leaderTmpDir = new File(leaderDir.toString(), "tmp"); + assertTrue(leaderTmpDir.mkdirs()); + OMDBCheckpointServlet.DirectoryData sstBackupDir = + new OMDBCheckpointServlet.DirectoryData(leaderTmpDir.toPath(), + leaderSstBackupDir.toString()); + Path srcSstBackup = Paths.get(sstBackupDir.getTmpDir().toString(), + "backup.sst"); + Path destSstBackup = Paths.get(sstBackupDir.getOriginalDir().toString(), + "backup.sst"); + truncateLength = leaderDir.toString().length() + 1; + existingSstList.add(truncateFileName(truncateLength, destSstBackup)); + Map> normalizedMap = + OMDBCheckpointServlet.normalizeExcludeList(existingSstList, + leaderCheckpointDir.toPath(), sstBackupDir); + Map> expectedMap = new TreeMap<>(); + Path s1 = Paths.get(leaderSnapDir1.toString(), "s1.sst"); + Path noLink = Paths.get(leaderSnapDir2.toString(), "noLink.sst"); + Path f1 = Paths.get(leaderCheckpointDir.toString(), "f1.sst"); + expectedMap.put("s1.sst", ImmutableMap.of(s1, s1)); + expectedMap.put("noLink.sst", ImmutableMap.of(noLink, noLink)); + expectedMap.put("f1.sst", ImmutableMap.of(f1, f1)); + expectedMap.put("backup.sst", ImmutableMap.of(srcSstBackup, destSstBackup)); + assertEquals(expectedMap, new TreeMap<>(normalizedMap)); } /* diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index 3cd785e53a5..b4b96ff4d88 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.recon.spi.impl; import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_DB_DIRS_PERMISSIONS_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT_V2; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_AUTH_TYPE; import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OM_SNAPSHOT_DB; @@ -84,7 +84,6 @@ import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.DBUpdates; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; -import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort.Type; import org.apache.hadoop.ozone.recon.ReconContext; @@ -197,11 +196,11 @@ public OzoneManagerServiceProviderImpl( HttpConfig.Policy policy = HttpConfig.getHttpPolicy(configuration); omDBSnapshotUrl = "http://" + ozoneManagerHttpAddress + - OZONE_DB_CHECKPOINT_HTTP_ENDPOINT_V2; + OZONE_DB_CHECKPOINT_HTTP_ENDPOINT; if (policy.isHttpsEnabled()) { omDBSnapshotUrl = "https://" + ozoneManagerHttpsAddress + - OZONE_DB_CHECKPOINT_HTTP_ENDPOINT_V2; + OZONE_DB_CHECKPOINT_HTTP_ENDPOINT; } boolean flushParam = configuration.getBoolean( @@ -414,7 +413,7 @@ public String getOzoneManagerSnapshotUrl() throws IOException { omLeaderUrl = (policy.isHttpsEnabled() ? "https://" + info.getServiceAddress(Type.HTTPS) : "http://" + info.getServiceAddress(Type.HTTP)) + - OZONE_DB_CHECKPOINT_HTTP_ENDPOINT_V2; + OZONE_DB_CHECKPOINT_HTTP_ENDPOINT; } } } @@ -471,7 +470,6 @@ public DBCheckpoint getOzoneManagerDBSnapshot() { try (InputStream inputStream = reconUtils.makeHttpCall( connectionFactory, getOzoneManagerSnapshotUrl(), isOmSpnegoEnabled()).getInputStream()) { tarExtractor.extractTar(inputStream, untarredDbDir); - OmSnapshotUtils.createHardLinks(untarredDbDir, true); } catch (IOException | InterruptedException e) { reconContext.updateHealthStatus(new AtomicBoolean(false)); reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);