diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java index 4c0e1a9c5017..675f8fbd398d 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java @@ -23,6 +23,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.file.Path; import java.util.Collection; import java.util.Comparator; import java.util.NoSuchElementException; @@ -51,11 +52,11 @@ */ public class SstFileSetReader { - private final Collection sstFiles; + private final Collection sstFiles; private volatile long estimatedTotalKeys = -1; - public SstFileSetReader(final Collection sstFiles) { + public SstFileSetReader(final Collection sstFiles) { this.sstFiles = sstFiles; } @@ -77,9 +78,9 @@ public long getEstimatedTotalKeys() throws RocksDBException { } try (ManagedOptions options = new ManagedOptions()) { - for (String sstFile : sstFiles) { + for (Path sstFile : sstFiles) { try (ManagedSstFileReader fileReader = new ManagedSstFileReader(options)) { - fileReader.open(sstFile); + fileReader.open(sstFile.toAbsolutePath().toString()); estimatedSize += fileReader.getTableProperties().getNumEntries(); } } @@ -303,7 +304,7 @@ public int hashCode() { private abstract static class MultipleSstFileIterator> implements ClosableIterator { private final PriorityQueue> minHeap; - private MultipleSstFileIterator(Collection sstFiles) { + private MultipleSstFileIterator(Collection sstFiles) { this.minHeap = new PriorityQueue<>(); init(); initMinHeap(sstFiles); @@ -313,10 +314,10 @@ private MultipleSstFileIterator(Collection sstFiles) { protected abstract ClosableIterator getKeyIteratorForFile(String file) throws RocksDBException, IOException; - private void initMinHeap(Collection files) { + private void initMinHeap(Collection files) { try { - for (String file : files) { - ClosableIterator iterator = getKeyIteratorForFile(file); + for (Path file : files) { + ClosableIterator iterator = getKeyIteratorForFile(file.toAbsolutePath().toString()); HeapEntry entry = new HeapEntry<>(iterator); if (entry.getCurrentKey() != null) { diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java index 2a3ae61fa568..e11abc3fdb16 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java @@ -741,26 +741,25 @@ private void preconditionChecksForLoadAllCompactionLogs() { * exist in backup directory before being involved in compactions), * and appends the extension '.sst'. */ - private String getSSTFullPath(String sstFilenameWithoutExtension, Path... dbPaths) { + private Path getSSTFullPath(SstFileInfo sstFileInfo, Path... dbPaths) throws IOException { // Try to locate the SST in the backup dir first - final Path sstPathInBackupDir = Paths.get(sstBackupDir, sstFilenameWithoutExtension + SST_FILE_EXTENSION); + final Path sstPathInBackupDir = sstFileInfo.getFilePath(Paths.get(sstBackupDir).toAbsolutePath()); if (Files.exists(sstPathInBackupDir)) { - return sstPathInBackupDir.toString(); + return sstPathInBackupDir.toAbsolutePath(); } // SST file does not exist in the SST backup dir, this means the SST file // has not gone through any compactions yet and is only available in the // src DB directory or destDB directory for (Path dbPath : dbPaths) { - final Path sstPathInDBDir = dbPath.resolve(sstFilenameWithoutExtension + SST_FILE_EXTENSION); + final Path sstPathInDBDir = sstFileInfo.getFilePath(dbPath); if (Files.exists(sstPathInDBDir)) { - return sstPathInDBDir.toString(); + return sstPathInDBDir.toAbsolutePath(); } } - // TODO: More graceful error handling? - throw new RuntimeException("Unable to locate SST file: " + sstFilenameWithoutExtension); + throw new IOException("Unable to locate SST file: " + sstFileInfo); } /** @@ -772,15 +771,13 @@ private String getSSTFullPath(String sstFilenameWithoutExtension, Path... dbPath * @param dest destination snapshot * @param versionMap version map containing the connection between source snapshot version and dest snapshot version. * @param tablesToLookup tablesToLookup set of table (column family) names used to restrict which SST files to return. - * @param sstFilesDirForSnapDiffJob dir to create hardlinks for SST files - * for snapDiff job. * @return A list of SST files without extension. * e.g. ["/path/to/sstBackupDir/000050.sst", * "/path/to/sstBackupDir/000060.sst"] */ - public synchronized Optional> getSSTDiffListWithFullPath(DifferSnapshotInfo src, + public synchronized Optional> getSSTDiffListWithFullPath(DifferSnapshotInfo src, DifferSnapshotInfo dest, Map versionMap, TablePrefixInfo prefixInfo, - Set tablesToLookup, String sstFilesDirForSnapDiffJob) throws IOException { + Set tablesToLookup) throws IOException { int srcVersion = src.getMaxVersion(); if (!versionMap.containsKey(srcVersion)) { throw new IOException("No corresponding dest version corresponding srcVersion : " + srcVersion + " in " + @@ -794,16 +791,15 @@ public synchronized Optional> getSSTDiffListWithFullPath(DifferSnap // of the sst file names. Optional> sstDiffList = getSSTDiffList(srcSnapshotVersion, destSnapshotVersion, prefixInfo, tablesToLookup, srcVersion == 0); - - return sstDiffList.map(diffList -> diffList.stream() - .map(sst -> { - String sstFullPath = getSSTFullPath(sst.getFileName(), srcSnapshotVersion.getDbPath(), - destSnapshotVersion.getDbPath()); - Path link = sst.getFilePath(Paths.get(sstFilesDirForSnapDiffJob)); - Path srcFile = Paths.get(sstFullPath); - createLink(link, srcFile); - return link.toString(); - }).collect(Collectors.toList())); + if (sstDiffList.isPresent()) { + Map sstFileInfoMap = new HashMap<>(); + for (SstFileInfo sstFileInfo : sstDiffList.get()) { + Path sstPath = getSSTFullPath(sstFileInfo, srcSnapshotVersion.getDbPath()); + sstFileInfoMap.put(sstPath, sstFileInfo); + } + return Optional.of(sstFileInfoMap); + } + return Optional.empty(); } /** diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java index c85c02f4c3b4..06870b2d8de3 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java @@ -69,12 +69,11 @@ public static Map filterRelevantSstFiles(Map /** * Filter sst files based on prefixes. The set of sst files to be filtered would be mutated. - * @param Type of the key in the map. * @param filesToBeFiltered sst files to be filtered. * @param tablesToLookup Set of column families to be included in the diff. * @param tablePrefixInfo TablePrefixInfo to filter irrelevant SST files. */ - public static Set filterRelevantSstFiles(Set filesToBeFiltered, + public static Set filterRelevantSstFiles(Set filesToBeFiltered, Set tablesToLookup, TablePrefixInfo tablePrefixInfo) { for (Iterator fileIterator = filesToBeFiltered.iterator(); fileIterator.hasNext();) { SstFileInfo sstFileInfo = fileIterator.next(); diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdb/util/TestSstFileSetReader.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdb/util/TestSstFileSetReader.java index 13b3e6dc5853..4cf008cadbcd 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdb/util/TestSstFileSetReader.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdb/util/TestSstFileSetReader.java @@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assumptions.assumeTrue; import java.io.File; +import java.nio.file.Path; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -74,7 +75,7 @@ class TestSstFileSetReader { * @return Absolute path to the created SST file * @throws RocksDBException if there's an error during SST file creation */ - private String createRandomSSTFile(TreeMap keys) + private Path createRandomSSTFile(TreeMap keys) throws RocksDBException { File file = new File(tempDir, "tmp_sst_file" + fileCounter.incrementAndGet() + ".sst"); @@ -94,7 +95,7 @@ private String createRandomSSTFile(TreeMap keys) sstFileWriter.finish(); } assertTrue(file.exists()); - return file.getAbsolutePath(); + return file.getAbsoluteFile().toPath(); } /** @@ -121,8 +122,8 @@ private Map createKeys(int startRange, int endRange) { * @return Pair containing the complete sorted key map and list of SST file paths * @throws RocksDBException if there's an error during SST file creation */ - private Pair, List> createDummyData(int numberOfFiles) throws RocksDBException { - List files = new ArrayList<>(); + private Pair, List> createDummyData(int numberOfFiles) throws RocksDBException { + List files = new ArrayList<>(); int numberOfKeysPerFile = 1000; TreeMap keys = new TreeMap<>(createKeys(0, numberOfKeysPerFile * numberOfFiles)); @@ -136,7 +137,7 @@ private Pair, List> createDummyData(int numbe cnt += 1; } for (TreeMap fileKeys : fileKeysList) { - String tmpSSTFile = createRandomSSTFile(fileKeys); + Path tmpSSTFile = createRandomSSTFile(fileKeys); files.add(tmpSSTFile); } return Pair.of(keys, files); @@ -153,8 +154,8 @@ private Pair, List> createDummyData(int numbe @ValueSource(ints = {0, 1, 2, 3, 7, 10}) public void testGetKeyStream(int numberOfFiles) throws RocksDBException { - Pair, List> data = createDummyData(numberOfFiles); - List files = data.getRight(); + Pair, List> data = createDummyData(numberOfFiles); + List files = data.getRight(); SortedMap keys = data.getLeft(); // Getting every possible combination of 2 elements from the sampled keys. // Reading the sst file lying within the given bounds and @@ -195,9 +196,9 @@ public void testGetKeyStream(int numberOfFiles) public void testGetKeyStreamWithTombstone(int numberOfFiles) throws RocksDBException { assumeTrue(ManagedRawSSTFileReader.tryLoadLibrary()); - Pair, List> data = + Pair, List> data = createDummyData(numberOfFiles); - List files = data.getRight(); + List files = data.getRight(); SortedMap keys = data.getLeft(); // Getting every possible combination of 2 elements from the sampled keys. // Reading the sst file lying within the given bounds and @@ -237,7 +238,7 @@ public void testMinHeapWithOverlappingSstFiles(int numberOfFiles) throws RocksDB assumeTrue(numberOfFiles >= 2); // Create overlapping SST files with some duplicate keys - List files = new ArrayList<>(); + List files = new ArrayList<>(); Map expectedKeys = new TreeMap<>(); // File 0: keys 0-9 (all valid entries) @@ -305,7 +306,7 @@ public void testDuplicateKeyHandlingWithLatestFilePrecedence(int numberOfFiles) throws RocksDBException { assumeTrue(numberOfFiles >= 3); - List files = new ArrayList<>(); + List files = new ArrayList<>(); // All files will contain the same set of keys, but we expect the last file to "win" String[] testKeys = {KEY_PREFIX + "duplicate1", KEY_PREFIX + "duplicate2", KEY_PREFIX + "duplicate3"}; diff --git a/hadoop-ozone/ozone-manager/dev-support/findbugsExcludeFile.xml b/hadoop-ozone/ozone-manager/dev-support/findbugsExcludeFile.xml index 55abc2630178..739fd1f8b40d 100644 --- a/hadoop-ozone/ozone-manager/dev-support/findbugsExcludeFile.xml +++ b/hadoop-ozone/ozone-manager/dev-support/findbugsExcludeFile.xml @@ -16,4 +16,8 @@ limitations under the License. --> + + + + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java index 3e6ccf771dc8..c3b9feae77fe 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java @@ -262,10 +262,6 @@ public OmSnapshotManager(OzoneManager ozoneManager) throws IOException { } this.ozoneManager = ozoneManager; - RocksDBCheckpointDiffer differ = ozoneManager - .getMetadataManager() - .getStore() - .getRocksDBCheckpointDiffer(); // Soft-limit of lru cache size this.softCacheSize = ozoneManager.getConfiguration().getInt( @@ -312,8 +308,8 @@ public OmSnapshotManager(OzoneManager ozoneManager) throws IOException { this.snapshotCache = new SnapshotCache(loader, softCacheSize, ozoneManager.getMetrics(), cacheCleanupServiceInterval, compactNonSnapshotDiffTables, ozoneManager.getMetadataManager().getLock()); - this.snapshotDiffManager = new SnapshotDiffManager(snapshotDiffDb, differ, - ozoneManager, snapshotLocalDataManager, snapDiffJobCf, snapDiffReportCf, + this.snapshotDiffManager = new SnapshotDiffManager(snapshotDiffDb, + ozoneManager, snapDiffJobCf, snapDiffReportCf, columnFamilyOptions, codecRegistry); diffCleanupServiceInterval = ozoneManager.getConfiguration() diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java index 219fc01f0a56..fa26953af2f5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java @@ -17,7 +17,6 @@ package org.apache.hadoop.ozone.om.snapshot; -import static java.util.stream.Collectors.toMap; import static org.apache.commons.lang3.StringUtils.leftPad; import static org.apache.hadoop.hdds.StringUtils.getLexicographicallyHigherString; import static org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.CREATE; @@ -59,9 +58,6 @@ import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.DIFF_REPORT_GEN; import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.OBJECT_ID_MAP_GEN_FSO; import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.OBJECT_ID_MAP_GEN_OBS; -import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.SST_FILE_DELTA_DAG_WALK; -import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.SST_FILE_DELTA_FULL_DIFF; -import static org.apache.ozone.rocksdiff.RocksDiffUtils.filterRelevantSstFiles; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; @@ -76,15 +72,14 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.NavigableMap; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; @@ -94,6 +89,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.io.file.PathUtils; import org.apache.commons.lang3.tuple.Pair; @@ -112,8 +109,6 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmSnapshot; -import org.apache.hadoop.ozone.om.OmSnapshotLocalData; -import org.apache.hadoop.ozone.om.OmSnapshotManager; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo; @@ -122,18 +117,18 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.helpers.WithObjectID; import org.apache.hadoop.ozone.om.helpers.WithParentObjectId; -import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider; +import org.apache.hadoop.ozone.om.snapshot.diff.delta.CompositeDeltaDiffComputer; +import org.apache.hadoop.ozone.om.snapshot.diff.delta.DeltaFileComputer; import org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse; import org.apache.hadoop.ozone.snapshot.ListSnapshotDiffJobResponse; import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone; import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse; import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus; +import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus; import org.apache.hadoop.ozone.util.ClosableIterator; import org.apache.logging.log4j.util.Strings; -import org.apache.ozone.rocksdb.util.RdbUtil; import org.apache.ozone.rocksdb.util.SstFileInfo; import org.apache.ozone.rocksdb.util.SstFileSetReader; -import org.apache.ozone.rocksdiff.DifferSnapshotInfo; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.rocksdb.ColumnFamilyDescriptor; @@ -157,7 +152,6 @@ public class SnapshotDiffManager implements AutoCloseable { private static final String MODIFY_DIFF_TABLE_SUFFIX = "-modify-diff"; private final ManagedRocksDB db; - private final RocksDBCheckpointDiffer differ; private final OzoneManager ozoneManager; private final OMMetadataManager activeOmMetadataManager; private final CodecRegistry codecRegistry; @@ -201,22 +195,17 @@ public class SnapshotDiffManager implements AutoCloseable { (SnapshotInfo fromSnapshotInfo, SnapshotInfo toSnapshotInfo) -> fromSnapshotInfo.getSnapshotId() + DELIMITER + toSnapshotInfo.getSnapshotId(); - private final OmSnapshotLocalDataManager snapshotLocalDataManager; @SuppressWarnings("parameternumber") public SnapshotDiffManager(ManagedRocksDB db, - RocksDBCheckpointDiffer differ, OzoneManager ozoneManager, - OmSnapshotLocalDataManager snapshotLocalDataManager, ColumnFamilyHandle snapDiffJobCfh, ColumnFamilyHandle snapDiffReportCfh, ManagedColumnFamilyOptions familyOptions, CodecRegistry codecRegistry) { this.db = db; - this.differ = differ; this.ozoneManager = ozoneManager; this.activeOmMetadataManager = ozoneManager.getMetadataManager(); - this.snapshotLocalDataManager = snapshotLocalDataManager; this.familyOptions = familyOptions; this.codecRegistry = codecRegistry; this.defaultWaitTime = ozoneManager.getConfiguration().getTimeDuration( @@ -345,52 +334,6 @@ private void createEmptySnapDiffDir(Path path) { } } - private void deleteDir(Path path) { - if (path == null || Files.notExists(path)) { - return; - } - - try { - PathUtils.deleteDirectory(path); - } catch (IOException e) { - // TODO: [SNAPSHOT] Fail gracefully - throw new IllegalStateException(e); - } - } - - /** - * Convert from SnapshotInfo to DifferSnapshotInfo. - */ - private static DifferSnapshotInfo getDSIFromSI(OMMetadataManager activeOmMetadataManager, - SnapshotInfo snapshotInfo, OmSnapshotLocalData snapshotLocalData) throws IOException { - final UUID snapshotId = snapshotInfo.getSnapshotId(); - final long dbTxSequenceNumber = snapshotInfo.getDbTxSequenceNumber(); - NavigableMap> versionSstFiles = snapshotLocalData.getVersionSstFileInfos() - .entrySet().stream().collect(toMap(Map.Entry::getKey, entry -> entry.getValue().getSstFiles(), - (u, v) -> { - throw new IllegalStateException(String.format("Duplicate key %s", u)); - }, TreeMap::new)); - if (versionSstFiles.isEmpty()) { - throw new IOException(String.format("No versions found corresponding to %s", snapshotId)); - } - return new DifferSnapshotInfo( - version -> OmSnapshotManager.getSnapshotPath(activeOmMetadataManager, snapshotId, version), - snapshotId, dbTxSequenceNumber, versionSstFiles); - } - - @VisibleForTesting - protected Set getSSTFileSetForSnapshot(OmSnapshot snapshot, Set tablesToLookUp) { - return RdbUtil.getSSTFilesForComparison( - ((RDBStore)snapshot.getMetadataManager().getStore()).getDb().getManagedRocksDb(), tablesToLookUp); - } - - @VisibleForTesting - protected Map getSSTFileMapForSnapshot(OmSnapshot snapshot, - Set tablesToLookUp) throws IOException { - return RdbUtil.getSSTFilesWithInodesForComparison(((RDBStore)snapshot - .getMetadataManager().getStore()).getDb().getManagedRocksDb(), tablesToLookUp); - } - /** * Gets the report key for a particular index of snapshot diff job. */ @@ -842,17 +785,21 @@ void generateSnapshotDiffReport(final String jobKey, // hardlinks. JobId is used as dir name for uniqueness. // It is required to prevent that SST files get deleted for in_progress // job by RocksDBCheckpointDiffer#pruneOlderSnapshotsWithCompactionHistory. - Path path = Paths.get(sstBackupDirForSnapDiffJobs + "/" + jobId); + Path diffJobPath = Paths.get(sstBackupDirForSnapDiffJobs).resolve(jobId); UncheckedAutoCloseableSupplier rcFromSnapshot = null; UncheckedAutoCloseableSupplier rcToSnapshot = null; - try { + boolean useFullDiff = snapshotForceFullDiff || forceFullDiff; + boolean performNonNativeDiff = diffDisableNativeLibs || disableNativeDiff || !isNativeLibsLoaded; + + Consumer activityReporter = (jobStatus) -> recordActivity(jobKey, jobStatus); + try (DeltaFileComputer deltaFileComputer = new CompositeDeltaDiffComputer(ozoneManager.getOmSnapshotManager(), + activeOmMetadataManager, diffJobPath, activityReporter, useFullDiff, performNonNativeDiff)) { if (!areDiffJobAndSnapshotsActive(volumeName, bucketName, fromSnapshotName, toSnapshotName)) { return; } - rcFromSnapshot = ozoneManager.getOmSnapshotManager() .getActiveSnapshot(volumeName, bucketName, fromSnapshotName); @@ -866,8 +813,6 @@ void generateSnapshotDiffReport(final String jobKey, volumeName, bucketName, fromSnapshotName); SnapshotInfo tsInfo = getSnapshotInfo(ozoneManager, volumeName, bucketName, toSnapshotName); - - Files.createDirectories(path); // JobId is prepended to column families name to make them unique // for request. fromSnapshotColumnFamily = @@ -901,9 +846,6 @@ void generateSnapshotDiffReport(final String jobKey, fromSnapshot.getMetadataManager()); TablePrefixInfo tablePrefixes = toSnapshot.getMetadataManager().getTableBucketPrefix(volumeName, bucketName); - boolean useFullDiff = snapshotForceFullDiff || forceFullDiff; - boolean performNonNativeDiff = diffDisableNativeLibs || disableNativeDiff; - if (!areDiffJobAndSnapshotsActive(volumeName, bucketName, fromSnapshotName, toSnapshotName)) { return; @@ -945,22 +887,20 @@ void generateSnapshotDiffReport(final String jobKey, () -> { recordActivity(jobKey, OBJECT_ID_MAP_GEN_OBS); getDeltaFilesAndDiffKeysToObjectIdToKeyMap(fsKeyTable, tsKeyTable, - fromSnapshot, toSnapshot, fsInfo, tsInfo, useFullDiff, - performNonNativeDiff, tablePrefixes, + fsInfo, tsInfo, performNonNativeDiff, tablePrefixes, objectIdToKeyNameMapForFromSnapshot, objectIdToKeyNameMapForToSnapshot, objectIdToIsDirMap, - oldParentIds, newParentIds, path.toString(), jobKey); + oldParentIds, newParentIds, deltaFileComputer, jobKey); return null; }, () -> { if (bucketLayout.isFileSystemOptimized()) { recordActivity(jobKey, OBJECT_ID_MAP_GEN_FSO); getDeltaFilesAndDiffKeysToObjectIdToKeyMap(fsDirTable, tsDirTable, - fromSnapshot, toSnapshot, fsInfo, tsInfo, useFullDiff, - performNonNativeDiff, tablePrefixes, + fsInfo, tsInfo, performNonNativeDiff, tablePrefixes, objectIdToKeyNameMapForFromSnapshot, objectIdToKeyNameMapForToSnapshot, objectIdToIsDirMap, - oldParentIds, newParentIds, path.toString(), jobKey); + oldParentIds, newParentIds, deltaFileComputer, jobKey); } return null; }, @@ -1034,8 +974,6 @@ void generateSnapshotDiffReport(final String jobKey, dropAndCloseColumnFamilyHandle(fromSnapshotColumnFamily); dropAndCloseColumnFamilyHandle(toSnapshotColumnFamily); dropAndCloseColumnFamilyHandle(objectIDsColumnFamily); - // Delete SST files backup directory. - deleteDir(path); // Decrement ref counts if (rcFromSnapshot != null) { rcFromSnapshot.close(); @@ -1050,38 +988,22 @@ void generateSnapshotDiffReport(final String jobKey, private void getDeltaFilesAndDiffKeysToObjectIdToKeyMap( final Table fsTable, final Table tsTable, - final OmSnapshot fromSnapshot, final OmSnapshot toSnapshot, final SnapshotInfo fsInfo, final SnapshotInfo tsInfo, - final boolean useFullDiff, final boolean skipNativeDiff, - final TablePrefixInfo tablePrefixes, + boolean skipNativeDiff, final TablePrefixInfo tablePrefixes, final PersistentMap oldObjIdToKeyMap, final PersistentMap newObjIdToKeyMap, final PersistentMap objectIdToIsDirMap, - final Optional> oldParentIds, - final Optional> newParentIds, - final String diffDir, final String jobKey) throws IOException, RocksDBException { + final Optional> oldParentIds, final Optional> newParentIds, + final DeltaFileComputer deltaFileComputer, final String jobKey) throws IOException, RocksDBException { Set tablesToLookUp = Collections.singleton(fsTable.getName()); - Set deltaFiles = getDeltaFiles(fromSnapshot, toSnapshot, - tablesToLookUp, fsInfo, tsInfo, useFullDiff, tablePrefixes, diffDir, jobKey); - - // Workaround to handle deletes if native rocksDb tool for reading - // tombstone is not loaded. - // TODO: [SNAPSHOT] Update Rocksdb SSTFileIterator to read tombstone - if (skipNativeDiff || !isNativeLibsLoaded) { - Set inputFiles = filterRelevantSstFiles(getSSTFileSetForSnapshot(fromSnapshot, tablesToLookUp), - tablesToLookUp, tablePrefixes); - Path fromSnapshotPath = fromSnapshot.getMetadataManager().getStore().getDbLocation().getAbsoluteFile().toPath(); - for (SstFileInfo sstFileInfo : inputFiles) { - deltaFiles.add(sstFileInfo.getFilePath(fromSnapshotPath).toAbsolutePath().toString()); - } - } + Collection> deltaFiles = deltaFileComputer.getDeltaFiles(fsInfo, tsInfo, + tablesToLookUp); if (LOG.isDebugEnabled()) { LOG.debug("Computed Delta SST File Set, Total count = {} ", deltaFiles.size()); } - addToObjectIdMap(fsTable, tsTable, deltaFiles, - !skipNativeDiff && isNativeLibsLoaded, - oldObjIdToKeyMap, newObjIdToKeyMap, objectIdToIsDirMap, oldParentIds, + addToObjectIdMap(fsTable, tsTable, deltaFiles.stream().map(Pair::getLeft).collect(Collectors.toList()), + !skipNativeDiff, oldObjIdToKeyMap, newObjIdToKeyMap, objectIdToIsDirMap, oldParentIds, newParentIds, tablePrefixes, jobKey); } @@ -1089,7 +1011,7 @@ private void getDeltaFilesAndDiffKeysToObjectIdToKeyMap( @SuppressWarnings("checkstyle:ParameterNumber") void addToObjectIdMap(Table fsTable, Table tsTable, - Set deltaFiles, boolean nativeRocksToolsLoaded, + Collection deltaFiles, boolean nativeRocksToolsLoaded, PersistentMap oldObjIdToKeyMap, PersistentMap newObjIdToKeyMap, PersistentMap objectIdToIsDirMap, @@ -1168,99 +1090,6 @@ void addToObjectIdMap(Table fsTable, } } - @VisibleForTesting - @SuppressWarnings("checkstyle:ParameterNumber") - Set getDeltaFiles(OmSnapshot fromSnapshot, - OmSnapshot toSnapshot, - Set tablesToLookUp, - SnapshotInfo fsInfo, - SnapshotInfo tsInfo, - boolean useFullDiff, - TablePrefixInfo tablePrefixInfo, - String diffDir, String jobKey) - throws IOException { - // TODO: [SNAPSHOT] Refactor the parameter list - Optional> deltaFiles = Optional.empty(); - // Check if compaction DAG is available, use that if so - if (differ != null && fsInfo != null && tsInfo != null && !useFullDiff) { - try (ReadableOmSnapshotLocalDataProvider snapLocalDataProvider = snapshotLocalDataManager.getOmSnapshotLocalData( - toSnapshot.getSnapshotID(), fromSnapshot.getSnapshotID())) { - OmSnapshotLocalData toSnapshotLocalData = snapLocalDataProvider.getSnapshotLocalData(); - OmSnapshotLocalData fromSnapshotLocalData = snapLocalDataProvider.getPreviousSnapshotLocalData(); - // Construct DifferSnapshotInfo - final DifferSnapshotInfo fromDSI = getDSIFromSI(activeOmMetadataManager, fsInfo, fromSnapshotLocalData); - final DifferSnapshotInfo toDSI = getDSIFromSI(activeOmMetadataManager, tsInfo, toSnapshotLocalData); - - recordActivity(jobKey, SST_FILE_DELTA_DAG_WALK); - LOG.debug("Calling RocksDBCheckpointDiffer"); - final Map versionMap = toSnapshotLocalData.getVersionSstFileInfos().entrySet() - .stream().collect(toMap(Map.Entry::getKey, entry -> entry.getValue().getPreviousSnapshotVersion())); - deltaFiles = differ.getSSTDiffListWithFullPath(toDSI, fromDSI, versionMap, tablePrefixInfo, tablesToLookUp, - diffDir).map(HashSet::new); - } catch (Exception exception) { - recordActivity(jobKey, SST_FILE_DELTA_FULL_DIFF); - LOG.warn("Failed to get SST diff file using RocksDBCheckpointDiffer. " + - "It will fallback to full diff now.", exception); - } - } - - if (useFullDiff || !deltaFiles.isPresent()) { - // If compaction DAG is not available (already cleaned up), fall back to - // the slower approach. - if (!useFullDiff) { - LOG.warn("RocksDBCheckpointDiffer is not available, falling back to" + - " slow path"); - } - recordActivity(jobKey, SST_FILE_DELTA_FULL_DIFF); - Set diffFiles = getDiffFiles(fromSnapshot, toSnapshot, tablesToLookUp, tablePrefixInfo); - deltaFiles = Optional.of(diffFiles); - } - - return deltaFiles.orElseThrow(() -> - new IOException("Error getting diff files b/w " + fromSnapshot.getSnapshotTableKey() + " and " + - toSnapshot.getSnapshotTableKey())); - } - - private Set getDiffFiles(OmSnapshot fromSnapshot, OmSnapshot toSnapshot, Set tablesToLookUp, - TablePrefixInfo tablePrefixInfo) { - Set diffFiles; - Path fromSnapshotPath = fromSnapshot.getMetadataManager().getStore().getDbLocation().getAbsoluteFile().toPath(); - Path toSnapshotPath = toSnapshot.getMetadataManager().getStore().getDbLocation().getAbsoluteFile().toPath(); - try { - diffFiles = new HashSet<>(); - Map fromSnapshotFiles = filterRelevantSstFiles(getSSTFileMapForSnapshot(fromSnapshot, - tablesToLookUp), tablesToLookUp, tablePrefixInfo); - Map toSnapshotFiles = filterRelevantSstFiles(getSSTFileMapForSnapshot(toSnapshot, - tablesToLookUp), tablesToLookUp, tablePrefixInfo); - for (Map.Entry entry : fromSnapshotFiles.entrySet()) { - if (!toSnapshotFiles.containsKey(entry.getKey())) { - diffFiles.add(entry.getValue().getFilePath(fromSnapshotPath).toAbsolutePath().toString()); - } - } - for (Map.Entry entry : toSnapshotFiles.entrySet()) { - if (!fromSnapshotFiles.containsKey(entry.getKey())) { - diffFiles.add(entry.getValue().getFilePath(toSnapshotPath).toAbsolutePath().toString()); - } - } - } catch (IOException e) { - // In case of exception during inode read use all files - LOG.error("Exception occurred while populating delta files for snapDiff", e); - LOG.warn("Falling back to full file list comparison, inode-based optimization skipped."); - Set fromSnapshotFiles = filterRelevantSstFiles(getSSTFileSetForSnapshot(fromSnapshot, - tablesToLookUp), tablesToLookUp, tablePrefixInfo); - Set toSnapshotFiles = filterRelevantSstFiles(getSSTFileSetForSnapshot(toSnapshot, - tablesToLookUp), tablesToLookUp, tablePrefixInfo); - diffFiles = new HashSet<>(); - for (SstFileInfo sstFileInfo : fromSnapshotFiles) { - diffFiles.add(sstFileInfo.getFilePath(fromSnapshotPath).toAbsolutePath().toString()); - } - for (SstFileInfo sstFileInfo : toSnapshotFiles) { - diffFiles.add(sstFileInfo.getFilePath(toSnapshotPath).toAbsolutePath().toString()); - } - } - return diffFiles; - } - private void validateEstimatedKeyChangesAreInLimits( SstFileSetReader sstFileReader ) throws RocksDBException, IOException { @@ -1599,7 +1428,7 @@ private synchronized void updateJobStatus(String jobKey, } synchronized void recordActivity(String jobKey, - SnapshotDiffResponse.SubStatus subStatus) { + SubStatus subStatus) { SnapshotDiffJob snapshotDiffJob = snapDiffJobTable.get(jobKey); snapshotDiffJob.setSubStatus(subStatus); snapDiffJobTable.put(jobKey, snapshotDiffJob); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/CompositeDeltaDiffComputer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/CompositeDeltaDiffComputer.java new file mode 100644 index 000000000000..4ef17d841141 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/CompositeDeltaDiffComputer.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot.diff.delta; + +import static org.apache.hadoop.ozone.om.snapshot.diff.delta.FullDiffComputer.getSSTFileSetForSnapshot; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.utils.db.TablePrefixInfo; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OmSnapshot; +import org.apache.hadoop.ozone.om.OmSnapshotManager; +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse; +import org.apache.ozone.rocksdb.util.SstFileInfo; +import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CompositeDeltaDiffComputer is responsible for computing the delta file + * differences between two snapshots, utilizing different strategies such + * as partial differ computation and full differ computation. + * + * It serves as an orchestrator to decide whether to perform a full diff + * or a more efficient partial diff, and handles fallback mechanisms if + * the chosen method fails. + * + * The class leverages two main difference computation strategies: + * - {@code RDBDifferComputer} for partial diff computation + * - {@code FullDiffComputer} for exhaustive diff + * + * This class also includes support for handling non-native diff scenarios + * through additional processing of input files from the "from" snapshot + * when native RocksDB tools are not used. + * + * Inherits from {@code FileLinkDeltaFileComputer} and implements the + * functionality for computing delta files and resource management. + */ +public class CompositeDeltaDiffComputer extends FileLinkDeltaFileComputer { + + private static final Logger LOG = LoggerFactory.getLogger(CompositeDeltaDiffComputer.class); + + private final RDBDifferComputer differComputer; + private final FullDiffComputer fullDiffComputer; + private final boolean nonNativeDiff; + + public CompositeDeltaDiffComputer(OmSnapshotManager snapshotManager, + OMMetadataManager activeMetadataManager, Path deltaDirPath, + Consumer activityReporter, boolean fullDiff, + boolean nonNativeDiff) throws IOException { + super(snapshotManager, activeMetadataManager, deltaDirPath, activityReporter); + differComputer = fullDiff ? null : new RDBDifferComputer(snapshotManager, activeMetadataManager, + deltaDirPath.resolve("rdbDiffer"), activityReporter); + fullDiffComputer = new FullDiffComputer(snapshotManager, activeMetadataManager, + deltaDirPath.resolve("fullDiff"), activityReporter); + this.nonNativeDiff = nonNativeDiff; + } + + @Override + Optional>> computeDeltaFiles(SnapshotInfo fromSnapshotInfo, + SnapshotInfo toSnapshotInfo, Set tablesToLookup, TablePrefixInfo tablePrefixInfo) throws IOException { + Map> deltaFiles = null; + try { + if (differComputer != null) { + updateActivity(SnapshotDiffResponse.SubStatus.SST_FILE_DELTA_DAG_WALK); + deltaFiles = differComputer.computeDeltaFiles(fromSnapshotInfo, toSnapshotInfo, tablesToLookup, + tablePrefixInfo).orElse(null); + } + } catch (Exception e) { + LOG.warn("Falling back to full diff.", e); + } + if (deltaFiles == null) { + updateActivity(SnapshotDiffResponse.SubStatus.SST_FILE_DELTA_FULL_DIFF); + deltaFiles = fullDiffComputer.computeDeltaFiles(fromSnapshotInfo, toSnapshotInfo, tablesToLookup, + tablePrefixInfo).orElse(null); + if (deltaFiles == null) { + // FileLinkDeltaFileComputer would throw an exception in this case. + return Optional.empty(); + } + } + // Workaround to handle deletes if native rocksDb tool for reading + // tombstone is not loaded. + // When performing non native diff, input files of from snapshot needs to be added. + if (nonNativeDiff) { + try (UncheckedAutoCloseableSupplier fromSnapshot = getSnapshot(fromSnapshotInfo)) { + Set fromSnapshotFiles = getSSTFileSetForSnapshot(fromSnapshot.get(), tablesToLookup, + tablePrefixInfo); + Path fromSnapshotPath = fromSnapshot.get().getMetadataManager().getStore().getDbLocation() + .getAbsoluteFile().toPath(); + for (SstFileInfo sstFileInfo : fromSnapshotFiles) { + Path source = sstFileInfo.getFilePath(fromSnapshotPath); + deltaFiles.put(source, Pair.of(createLink(source), sstFileInfo)); + } + } + } + return Optional.of(deltaFiles); + } + + @Override + public void close() throws IOException { + if (differComputer != null) { + differComputer.close(); + } + if (fullDiffComputer != null) { + fullDiffComputer.close(); + } + super.close(); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/FileLinkDeltaFileComputer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/FileLinkDeltaFileComputer.java index ff4bac52dba7..a6860574339e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/FileLinkDeltaFileComputer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/FileLinkDeltaFileComputer.java @@ -49,7 +49,8 @@ /** * The {@code FileLinkDeltaFileComputer} is an abstract class that provides a * base implementation for the {@code DeltaFileComputer} interface. It is - * responsible for computing delta files by creating hard links to the + * responsible for computing delta files (a list of files if read completely would be able to completely + * compute all the key changes between two snapshots). Hard links to the * relevant source files in a specified delta directory, enabling a compact * representation of changes between snapshots. * @@ -63,16 +64,16 @@ public abstract class FileLinkDeltaFileComputer implements DeltaFileComputer { private final OmSnapshotManager omSnapshotManager; private final OMMetadataManager activeMetadataManager; private final Consumer activityReporter; - private final Path deltaDir; + private final Path tmpDeltaFileLinkDir; private final AtomicInteger linkFileCounter = new AtomicInteger(0); FileLinkDeltaFileComputer(OmSnapshotManager snapshotManager, OMMetadataManager activeMetadataManager, Path deltaDirPath, Consumer activityReporter) throws IOException { - this.deltaDir = deltaDirPath.toAbsolutePath(); + this.tmpDeltaFileLinkDir = deltaDirPath.toAbsolutePath(); this.omSnapshotManager = snapshotManager; this.activityReporter = activityReporter; this.activeMetadataManager = activeMetadataManager; - createDirectories(deltaDir); + createDirectories(tmpDeltaFileLinkDir); } /** @@ -119,7 +120,7 @@ Path createLink(Path path) throws IOException { String extension = getExtension(fileName.toString()); extension = StringUtils.isBlank(extension) ? "" : ("." + extension); do { - link = deltaDir.resolve(linkFileCounter.incrementAndGet() + extension); + link = tmpDeltaFileLinkDir.resolve(linkFileCounter.incrementAndGet() + extension); try { Files.createLink(link, source); createdLink = true; @@ -147,9 +148,9 @@ OMMetadataManager getActiveMetadataManager() { @Override public void close() throws IOException { - if (deltaDir == null || Files.notExists(deltaDir)) { + if (tmpDeltaFileLinkDir == null || Files.notExists(tmpDeltaFileLinkDir)) { return; } - deleteDirectory(deltaDir); + deleteDirectory(tmpDeltaFileLinkDir); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/FullDiffComputer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/FullDiffComputer.java index e3c6c0dcae46..6beb5f7dc9b9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/FullDiffComputer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/FullDiffComputer.java @@ -46,6 +46,8 @@ * in files and generates corresponding links for easier processing of snapshot diffs. * This implementation handles cases of optimized inode-based comparisons as well as * fallback with full file list comparisons in case of exceptions. + * The delta files would be all files which are present in the source snapshot and not present in the target snapshot + * and vice versa. */ class FullDiffComputer extends FileLinkDeltaFileComputer { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/RDBDifferComputer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/RDBDifferComputer.java new file mode 100644 index 000000000000..0a59029fb0f4 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/RDBDifferComputer.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot.diff.delta; + +import static java.util.stream.Collectors.toMap; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; +import java.util.function.Consumer; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.utils.db.TablePrefixInfo; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OmSnapshotLocalData; +import org.apache.hadoop.ozone.om.OmSnapshotManager; +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager; +import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus; +import org.apache.ozone.rocksdb.util.SstFileInfo; +import org.apache.ozone.rocksdiff.DifferSnapshotInfo; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; + +/** + * Computes RocksDB SST file differences between two snapshots and materializes + * differing SST files as hard links in the configured delta directory. + * + *

This class uses {@link RocksDBCheckpointDiffer} to obtain the list of SST + * files that differ between a \"from\" and a \"to\" snapshot. It opens local + * snapshot metadata via {@link #getLocalDataProvider}, and delegates the + * comparison to the differ to compute the delta files.

+ * + *

Each source SST file returned by the differ is linked into the delta + * directory using {@link FileLinkDeltaFileComputer#createLink(Path)}, and the + * returned value from {@link #computeDeltaFiles} is a list of those link + * paths. The implementation synchronizes on the internal {@code differ} + * instance because the differ is not assumed to be thread-safe.

+ */ +class RDBDifferComputer extends FileLinkDeltaFileComputer { + + private final RocksDBCheckpointDiffer differ; + + RDBDifferComputer(OmSnapshotManager omSnapshotManager, OMMetadataManager activeMetadataManager, + Path deltaDirPath, Consumer activityReporter) throws IOException { + super(omSnapshotManager, activeMetadataManager, deltaDirPath, activityReporter); + this.differ = activeMetadataManager.getStore().getRocksDBCheckpointDiffer(); + } + + @Override + public Optional>> computeDeltaFiles(SnapshotInfo fromSnapshot, + SnapshotInfo toSnapshot, Set tablesToLookup, TablePrefixInfo tablePrefixInfo) throws IOException { + if (differ != null) { + try (OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider snapProvider = + getLocalDataProvider(toSnapshot.getSnapshotId(), fromSnapshot.getSnapshotId())) { + final DifferSnapshotInfo fromDSI = getDSIFromSI(getActiveMetadataManager(), fromSnapshot, + snapProvider.getPreviousSnapshotLocalData()); + final DifferSnapshotInfo toDSI = getDSIFromSI(getActiveMetadataManager(), toSnapshot, + snapProvider.getSnapshotLocalData()); + final Map versionMap = snapProvider.getSnapshotLocalData().getVersionSstFileInfos().entrySet() + .stream().collect(toMap(Map.Entry::getKey, entry -> entry.getValue().getPreviousSnapshotVersion())); + synchronized (differ) { + Optional> paths = differ.getSSTDiffListWithFullPath(toDSI, fromDSI, versionMap, + tablePrefixInfo, tablesToLookup); + if (paths.isPresent()) { + Map> links = new HashMap<>(paths.get().size()); + for (Map.Entry source : paths.get().entrySet()) { + links.put(source.getKey(), Pair.of(createLink(source.getKey()), source.getValue())); + } + return Optional.of(links); + } + } + } + } + return Optional.empty(); + } + + /** + * Convert from SnapshotInfo to DifferSnapshotInfo. + */ + private static DifferSnapshotInfo getDSIFromSI(OMMetadataManager activeOmMetadataManager, + SnapshotInfo snapshotInfo, OmSnapshotLocalData snapshotLocalData) throws IOException { + final UUID snapshotId = snapshotInfo.getSnapshotId(); + final long dbTxSequenceNumber = snapshotInfo.getDbTxSequenceNumber(); + NavigableMap> versionSstFiles = snapshotLocalData.getVersionSstFileInfos().entrySet() + .stream().collect(toMap(Map.Entry::getKey, + entry -> entry.getValue().getSstFiles(), (u, v) -> { + throw new IllegalStateException(String.format("Duplicate key %s", u)); + }, TreeMap::new)); + if (versionSstFiles.isEmpty()) { + throw new IOException(String.format("No versions found corresponding to %s", snapshotId)); + } + return new DifferSnapshotInfo( + version -> OmSnapshotManager.getSnapshotPath(activeOmMetadataManager, snapshotId, version), + snapshotId, dbTxSequenceNumber, versionSstFiles); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java index b484ad628c72..440081f269a7 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java @@ -49,7 +49,6 @@ import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN_PROGRESS; import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.QUEUED; import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.REJECTED; -import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.SST_FILE_EXTENSION; import static org.apache.ratis.util.JavaUtils.attempt; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -59,17 +58,11 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyBoolean; -import static org.mockito.Mockito.anyDouble; -import static org.mockito.Mockito.anyInt; -import static org.mockito.Mockito.anyMap; -import static org.mockito.Mockito.anySet; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockConstruction; @@ -80,17 +73,15 @@ import com.google.common.cache.CacheLoader; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import jakarta.annotation.Nonnull; import java.io.File; import java.io.IOException; -import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -105,7 +96,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.LongStream; @@ -129,7 +119,6 @@ import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OMMetrics; import org.apache.hadoop.ozone.om.OmSnapshot; -import org.apache.hadoop.ozone.om.OmSnapshotLocalData; import org.apache.hadoop.ozone.om.OmSnapshotManager; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; @@ -140,7 +129,6 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.helpers.WithParentObjectId; import org.apache.hadoop.ozone.om.lock.OmReadOnlyLock; -import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider; import org.apache.hadoop.ozone.om.snapshot.SnapshotTestUtils.StubbedPersistentMap; import org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse; import org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse.CancelMessage; @@ -151,17 +139,10 @@ import org.apache.hadoop.ozone.util.ClosableIterator; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ExitUtil; -import org.apache.ozone.rocksdb.util.RdbUtil; -import org.apache.ozone.rocksdb.util.SstFileInfo; import org.apache.ozone.rocksdb.util.SstFileSetReader; -import org.apache.ozone.rocksdiff.DifferSnapshotInfo; -import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; -import org.apache.ozone.rocksdiff.RocksDiffUtils; import org.apache.ratis.util.ExitUtils; import org.apache.ratis.util.TimeDuration; -import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -171,19 +152,15 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.MockedConstruction; import org.mockito.MockedStatic; -import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import org.mockito.stubbing.Answer; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; /** * Tests for SnapshotDiffManager. @@ -210,10 +187,6 @@ public class TestSnapshotDiffManager { private final OMMetrics omMetrics = OMMetrics.create(); @TempDir private File dbDir; - @TempDir - private File snapDiffDir; - @Mock - private RocksDBCheckpointDiffer differ; @Mock private OMMetadataManager omMetadataManager; @Mock @@ -231,15 +204,6 @@ public class TestSnapshotDiffManager { @Mock private RDBStore dbStore; - @Mock - private RocksIterator jobTableIterator; - - @Mock - private OmSnapshotLocalDataManager localDataManager; - - @Mock - private OmSnapshotManager omSnapshotManager; - private static CodecRegistry codecRegistry; private final BiFunction @@ -289,7 +253,6 @@ public void init() throws RocksDBException, IOException, ExecutionException { String snapshotNamePrefix = "snap-"; String snapshotPath = "snapshotPath"; - String snapshotCheckpointDir = "snapshotCheckpointDir"; UUID baseSnapshotId = UUID.randomUUID(); String baseSnapshotName = snapshotNamePrefix + baseSnapshotId; snapshotInfo = new SnapshotInfo.Builder() @@ -375,7 +338,7 @@ public void init() throws RocksDBException, IOException, ExecutionException { when(ozoneManager.getConfiguration()).thenReturn(configuration); when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager); - omSnapshotManager = mock(OmSnapshotManager.class); + OmSnapshotManager omSnapshotManager = mock(OmSnapshotManager.class); when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager); SnapshotCache snapshotCache = new SnapshotCache(mockCacheLoader(), 10, omMetrics, 0, true, new OmReadOnlyLock()); @@ -387,7 +350,7 @@ public void init() throws RocksDBException, IOException, ExecutionException { return snapshotCache.get(snapInfo.getSnapshotId()); }); when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager); - snapshotDiffManager = new SnapshotDiffManager(db, differ, ozoneManager, localDataManager, + snapshotDiffManager = new SnapshotDiffManager(db, ozoneManager, snapDiffJobTable, snapDiffReportTable, columnFamilyOptions, codecRegistry); when(omSnapshotManager.getDiffCleanupServiceInterval()).thenReturn(0L); } @@ -418,236 +381,6 @@ private OmSnapshot getMockedOmSnapshot(UUID snapshotId) { return omSnapshot; } - private SnapshotInfo getMockedSnapshotInfo(UUID snapshotId) { - SnapshotInfo snapInfo = mock(SnapshotInfo.class); - when(snapInfo.getSnapshotId()).thenReturn(snapshotId); - return snapInfo; - } - - @ParameterizedTest - @ValueSource(ints = {0, 1, 2, 5, 10, 100, 1000, 10000}) - public void testGetDeltaFilesWithDag(int numberOfFiles) throws IOException { - UUID snap1 = UUID.randomUUID(); - UUID snap2 = UUID.randomUUID(); - when(snapshotInfoTable.get(SnapshotInfo.getTableKey(VOLUME_NAME, BUCKET_NAME, snap1.toString()))) - .thenReturn(getSnapshotInfoInstance(VOLUME_NAME, BUCKET_NAME, snap1.toString(), snap2)); - when(snapshotInfoTable.get(SnapshotInfo.getTableKey(VOLUME_NAME, BUCKET_NAME, snap2.toString()))) - .thenReturn(getSnapshotInfoInstance(VOLUME_NAME, BUCKET_NAME, snap2.toString(), snap2)); - - String diffDir = snapDiffDir.getAbsolutePath(); - String diffJobKey = snap1 + DELIMITER + snap2; - Set randomStrings = IntStream.range(0, numberOfFiles) - .mapToObj(i -> RandomStringUtils.secure().nextAlphabetic(10)) - .collect(Collectors.toSet()); - - when(differ.getSSTDiffListWithFullPath( - any(DifferSnapshotInfo.class), - any(DifferSnapshotInfo.class), - anyMap(), - any(TablePrefixInfo.class), - anySet(), - eq(diffDir)) - ).thenReturn(Optional.of(Lists.newArrayList(randomStrings))); - mockSnapshotLocalData(); - UncheckedAutoCloseableSupplier rcFromSnapshot = - omSnapshotManager.getActiveSnapshot(VOLUME_NAME, BUCKET_NAME, snap1.toString()); - UncheckedAutoCloseableSupplier rcToSnapshot = - omSnapshotManager.getActiveSnapshot(VOLUME_NAME, BUCKET_NAME, snap2.toString()); - OmSnapshot fromSnapshot = rcFromSnapshot.get(); - OmSnapshot toSnapshot = rcToSnapshot.get(); - - SnapshotInfo fromSnapshotInfo = getMockedSnapshotInfo(snap1); - SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap2); - when(jobTableIterator.isValid()).thenReturn(false); - - SnapshotDiffManager spy = spy(snapshotDiffManager); - doNothing().when(spy).recordActivity(any(), any()); - doNothing().when(spy).updateProgress(anyString(), anyDouble()); - Set deltaFiles = spy.getDeltaFiles( - fromSnapshot, - toSnapshot, - Sets.newHashSet("cf1", "cf2"), fromSnapshotInfo, - toSnapshotInfo, false, - new TablePrefixInfo(Collections.emptyMap()), diffDir, diffJobKey); - assertEquals(randomStrings, deltaFiles); - - rcFromSnapshot.close(); - rcToSnapshot.close(); - } - - private void mockSnapshotLocalData() throws IOException { - OmSnapshotLocalData localData = mock(OmSnapshotLocalData.class); - ReadableOmSnapshotLocalDataProvider snapProvider = mock(ReadableOmSnapshotLocalDataProvider.class); - when(snapProvider.getPreviousSnapshotLocalData()).thenReturn(localData); - when(snapProvider.getSnapshotLocalData()).thenReturn(localData); - OmSnapshotLocalData.VersionMeta versionMeta = mock(OmSnapshotLocalData.VersionMeta.class); - when(versionMeta.getSstFiles()).thenReturn(Collections.emptyList()); - when(localData.getVersionSstFileInfos()).thenReturn(ImmutableMap.of(0, versionMeta)); - when(localDataManager.getOmSnapshotLocalData(any(UUID.class), any(UUID.class))).thenReturn(snapProvider); - } - - @ParameterizedTest - @CsvSource({"0,true", "1,true", "2,true", "5,true", "10,true", "100,true", - "1000,true", "10000,true", "0,false", "1,false", "2,false", "5,false", - "10,false", "100,false", "1000,false", "10000,false"}) - public void testGetDeltaFilesWithFullDiff(int numberOfFiles, - boolean useFullDiff) - throws IOException { - try (MockedStatic mockedRdbUtil = mockStatic(RdbUtil.class); - MockedStatic mockedRocksDiffUtils = - mockStatic(RocksDiffUtils.class)) { - Set deltaStrings = new HashSet<>(); - - mockedRdbUtil.when( - () -> RdbUtil.getSSTFilesWithInodesForComparison(any(), anySet())) - .thenAnswer(invocation -> { - Map retVal = IntStream.range(0, numberOfFiles) - .mapToObj(i -> RandomStringUtils.secure().nextAlphabetic(10)) - .collect(Collectors.toMap(Function.identity(), - i -> new SstFileInfo(i, null, null, null))); - deltaStrings.addAll(retVal.keySet().stream().map(Object::toString).collect(Collectors.toSet())); - return retVal; - }); - - mockedRocksDiffUtils.when(() -> - RocksDiffUtils.filterRelevantSstFiles(anyMap(), anySet(), any())) - .thenAnswer(invocationOnMock -> { - invocationOnMock.getArgument(0, Map.class).entrySet().stream() - .findAny().ifPresent(val -> { - Map.Entry entry = (Map.Entry) val; - assertTrue(deltaStrings.contains(entry.getKey())); - invocationOnMock.getArgument(0, Map.class).remove(entry.getKey()); - deltaStrings.remove(entry.getKey()); - }); - return invocationOnMock.getArgument(0, Map.class); - }); - UUID snap1 = UUID.randomUUID(); - UUID snap2 = UUID.randomUUID(); - String diffJobKey = snap1 + DELIMITER + snap2; - when(snapshotInfoTable.get(SnapshotInfo.getTableKey(VOLUME_NAME, BUCKET_NAME, snap1.toString()))) - .thenReturn(getSnapshotInfoInstance(VOLUME_NAME, BUCKET_NAME, snap1.toString(), snap2)); - when(snapshotInfoTable.get(SnapshotInfo.getTableKey(VOLUME_NAME, BUCKET_NAME, snap2.toString()))) - .thenReturn(getSnapshotInfoInstance(VOLUME_NAME, BUCKET_NAME, snap2.toString(), snap2)); - if (!useFullDiff) { - when(differ.getSSTDiffListWithFullPath( - any(DifferSnapshotInfo.class), - any(DifferSnapshotInfo.class), - anyMap(), - any(TablePrefixInfo.class), - anySet(), - anyString())) - .thenReturn(Optional.empty()); - } - mockSnapshotLocalData(); - UncheckedAutoCloseableSupplier rcFromSnapshot = - omSnapshotManager.getActiveSnapshot(VOLUME_NAME, BUCKET_NAME, snap1.toString()); - UncheckedAutoCloseableSupplier rcToSnapshot = - omSnapshotManager.getActiveSnapshot(VOLUME_NAME, BUCKET_NAME, snap2.toString()); - OmSnapshot fromSnapshot = rcFromSnapshot.get(); - OmSnapshot toSnapshot = rcToSnapshot.get(); - - SnapshotInfo fromSnapshotInfo = getMockedSnapshotInfo(snap1); - SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap1); - when(jobTableIterator.isValid()).thenReturn(false); - SnapshotDiffManager spy = spy(snapshotDiffManager); - doNothing().when(spy).recordActivity(any(), any()); - doNothing().when(spy).updateProgress(anyString(), anyDouble()); - Set deltaFiles = spy.getDeltaFiles( - fromSnapshot, - toSnapshot, - Sets.newHashSet("cf1", "cf2"), - fromSnapshotInfo, - toSnapshotInfo, - false, - new TablePrefixInfo(Collections.emptyMap()), - snapDiffDir.getAbsolutePath(), diffJobKey); - assertEquals(deltaStrings.stream() - .map(i -> dbStore.getDbLocation().toPath().resolve(i + SST_FILE_EXTENSION).toAbsolutePath().toString()) - .collect(Collectors.toSet()), deltaFiles); - if (useFullDiff && numberOfFiles > 1) { - assertThat(deltaFiles).isNotEmpty(); - } - } - } - - @ParameterizedTest - @ValueSource(ints = {0, 1, 2, 5, 10, 100, 1000, 10000}) - public void testGetDeltaFilesWithDifferThrowException(int numberOfFiles) - throws IOException { - try (MockedStatic mockedRdbUtil = mockStatic(RdbUtil.class); - MockedStatic mockedRocksDiffUtils = - mockStatic(RocksDiffUtils.class)) { - Set deltaStrings = new HashSet<>(); - - mockedRdbUtil.when( - () -> RdbUtil.getSSTFilesForComparison(any(), anySet())) - .thenAnswer((Answer>) invocation -> { - Set retVal = IntStream.range(0, numberOfFiles) - .mapToObj(i -> RandomStringUtils.secure().nextAlphabetic(10)) - .collect(Collectors.toSet()); - deltaStrings.addAll(retVal); - return retVal; - }); - - mockedRocksDiffUtils.when(() -> - RocksDiffUtils.filterRelevantSstFiles(anySet(), anySet(), any())) - .thenAnswer((Answer) invocationOnMock -> { - invocationOnMock.getArgument(0, Set.class).stream() - .findAny().ifPresent(val -> { - assertTrue(deltaStrings.contains(val)); - invocationOnMock.getArgument(0, Set.class).remove(val); - deltaStrings.remove(val); - }); - return null; - }); - UUID snap1 = UUID.randomUUID(); - UUID snap2 = UUID.randomUUID(); - when(snapshotInfoTable.get(SnapshotInfo.getTableKey(VOLUME_NAME, BUCKET_NAME, snap1.toString()))) - .thenReturn(getSnapshotInfoInstance(VOLUME_NAME, BUCKET_NAME, snap1.toString(), snap1)); - when(snapshotInfoTable.get(SnapshotInfo.getTableKey(VOLUME_NAME, BUCKET_NAME, snap2.toString()))) - .thenReturn(getSnapshotInfoInstance(VOLUME_NAME, BUCKET_NAME, snap2.toString(), snap2)); - - doThrow(new RuntimeException("File not found exception.")) - .when(differ) - .getSSTDiffListWithFullPath( - any(DifferSnapshotInfo.class), - any(DifferSnapshotInfo.class), - anyMap(), - any(TablePrefixInfo.class), - anySet(), - anyString()); - - UncheckedAutoCloseableSupplier rcFromSnapshot = - omSnapshotManager.getActiveSnapshot(VOLUME_NAME, BUCKET_NAME, snap1.toString()); - UncheckedAutoCloseableSupplier rcToSnapshot = - omSnapshotManager.getActiveSnapshot(VOLUME_NAME, BUCKET_NAME, snap2.toString()); - OmSnapshot fromSnapshot = rcFromSnapshot.get(); - OmSnapshot toSnapshot = rcToSnapshot.get(); - - SnapshotInfo fromSnapshotInfo = getMockedSnapshotInfo(snap1); - SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap1); - when(jobTableIterator.isValid()).thenReturn(false); - String diffJobKey = snap1 + DELIMITER + snap2; - SnapshotDiffManager spy = spy(snapshotDiffManager); - doNothing().when(spy).recordActivity(any(), any()); - doNothing().when(spy).updateProgress(anyString(), anyDouble()); - mockSnapshotLocalData(); - Set deltaFiles = spy.getDeltaFiles( - fromSnapshot, - toSnapshot, - Sets.newHashSet("cf1", "cf2"), - fromSnapshotInfo, - toSnapshotInfo, - false, - new TablePrefixInfo(Collections.emptyMap()), - snapDiffDir.getAbsolutePath(), diffJobKey); - assertEquals(deltaStrings, deltaFiles); - - rcFromSnapshot.close(); - rcToSnapshot.close(); - } - } - private Table getMockedTable( Map map, String tableName) throws IOException { @@ -746,7 +479,7 @@ public void testObjectIdMapWithTombstoneEntries(boolean nativeLibraryLoaded, Set newParentIds = Sets.newHashSet(); spy.addToObjectIdMap(toSnapshotTable, - fromSnapshotTable, Sets.newHashSet("dummy.sst"), + fromSnapshotTable, Sets.newHashSet(Paths.get("dummy.sst")), nativeLibraryLoaded, oldObjectIdKeyMap, newObjectIdKeyMap, objectIdsToCheck, Optional.of(oldParentIds), Optional.of(newParentIds), @@ -1562,84 +1295,6 @@ private void setupMocksForRunningASnapDiff( when(bucketInfoTable.get(bucketKey)).thenReturn(bucketInfo); } - @Test - public void testGetDeltaFilesWithFullDiff() throws IOException { - SnapshotDiffManager spy = spy(snapshotDiffManager); - UUID snap1 = UUID.randomUUID(); - OmSnapshot fromSnapshot = getMockedOmSnapshot(snap1); - Path fromSnapshotPath = fromSnapshot.getMetadataManager().getStore().getDbLocation().toPath(); - UUID snap2 = UUID.randomUUID(); - OmSnapshot toSnapshot = getMockedOmSnapshot(snap2); - Path toSnapshotPath = toSnapshot.getMetadataManager().getStore().getDbLocation().toPath(); - Mockito.doAnswer(invocation -> { - OmSnapshot snapshot = invocation.getArgument(0); - if (snapshot == fromSnapshot) { - Map inodeToFileMap = new HashMap<>(); - inodeToFileMap.put(1, new SstFileInfo("1", null, null, null)); - inodeToFileMap.put(2, new SstFileInfo("2", null, null, null)); - inodeToFileMap.put(3, new SstFileInfo("3", null, null, null)); - return inodeToFileMap; - } - if (snapshot == toSnapshot) { - Map inodeToFileMap = new HashMap<>(); - inodeToFileMap.put(1, new SstFileInfo("10", null, null, null)); - inodeToFileMap.put(2, new SstFileInfo("20", null, null, null)); - inodeToFileMap.put(4, new SstFileInfo("4", null, null, null)); - return inodeToFileMap; - } - return null; - }).when(spy).getSSTFileMapForSnapshot(Mockito.any(OmSnapshot.class), Mockito.anySet()); - doNothing().when(spy).recordActivity(any(), any()); - doNothing().when(spy).updateProgress(anyString(), anyDouble()); - String diffJobKey = snap1 + DELIMITER + snap2; - - Set deltaFiles = spy.getDeltaFiles(fromSnapshot, toSnapshot, Collections.emptySet(), snapshotInfo, - snapshotInfo, true, new TablePrefixInfo(Collections.emptyMap()), null, diffJobKey); - Assertions.assertEquals(Sets.newHashSet(fromSnapshotPath.resolve("3.sst").toAbsolutePath().toString(), - toSnapshotPath.resolve("4.sst").toAbsolutePath().toString()), deltaFiles); - } - - @Test - public void testGetSnapshotDiffReportHappyCase() throws Exception { - SnapshotInfo fromSnapInfo = snapshotInfo; - SnapshotInfo toSnapInfo = snapshotInfoList.get(0); - - Set testDeltaFiles = new HashSet<>(); - - SnapshotDiffManager spy = spy(snapshotDiffManager); - - doReturn(testDeltaFiles).when(spy).getDeltaFiles(any(OmSnapshot.class), - any(OmSnapshot.class), anySet(), eq(fromSnapInfo), eq(toSnapInfo), - eq(false), any(), anyString(), - anyString()); - - doReturn(testDeltaFiles).when(spy) - .getSSTFileSetForSnapshot(any(OmSnapshot.class), anySet()); - - doNothing().when(spy).addToObjectIdMap(eq(keyInfoTable), eq(keyInfoTable), - any(), anyBoolean(), any(), any(), any(), any(), any(), any(), anyString()); - doNothing().when(spy).checkReportsIntegrity(any(), anyInt(), anyInt()); - - doReturn(10L).when(spy).generateDiffReport(anyString(), - any(), any(), any(), any(), any(), any(), any(), - anyString(), anyString(), anyString(), anyString(), anyBoolean(), - any(), any(), any()); - doReturn(LEGACY).when(spy).getBucketLayout(VOLUME_NAME, BUCKET_NAME, - omMetadataManager); - - spy.getSnapshotDiffReport(VOLUME_NAME, BUCKET_NAME, fromSnapInfo.getName(), - toSnapInfo.getName(), 0, 1000, false, false); - - Thread.sleep(1000L); - spy.getSnapshotDiffReport(VOLUME_NAME, BUCKET_NAME, fromSnapInfo.getName(), - toSnapInfo.getName(), 0, 1000, false, false); - - SnapshotDiffJob snapDiffJob = getSnapshotDiffJobFromDb(fromSnapInfo, - toSnapInfo); - assertEquals(DONE, snapDiffJob.getStatus()); - assertEquals(10L, snapDiffJob.getTotalDiffEntries()); - } - /** * Tests that only QUEUED jobs are submitted to the executor and rest are * short-circuited based on previous one. diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/TestCompositeDeltaDiffComputer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/TestCompositeDeltaDiffComputer.java new file mode 100644 index 000000000000..e8af3f84dd72 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/TestCompositeDeltaDiffComputer.java @@ -0,0 +1,726 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot.diff.delta; + +import static org.apache.hadoop.hdds.utils.IOUtils.getINode; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Consumer; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.TablePrefixInfo; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OmSnapshot; +import org.apache.hadoop.ozone.om.OmSnapshotManager; +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager; +import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus; +import org.apache.ozone.rocksdb.util.SstFileInfo; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; +import org.mockito.MockitoAnnotations; + +/** + * Unit tests for CompositeDeltaDiffComputer using Mockito.mockConstruction() + * to properly isolate and test fallback logic. + */ +public class TestCompositeDeltaDiffComputer { + + @TempDir + private Path tempDir; + + @Mock + private OmSnapshotManager omSnapshotManager; + + @Mock + private OMMetadataManager activeMetadataManager; + + @Mock + private OmSnapshotLocalDataManager localDataManager; + + @Mock + private RDBStore rdbStore; + + @Mock + private RocksDBCheckpointDiffer differ; + + @Mock + private Consumer activityReporter; + + private AutoCloseable mocks; + private Path deltaDirPath; + + @BeforeEach + public void setUp() throws IOException { + mocks = MockitoAnnotations.openMocks(this); + deltaDirPath = tempDir.resolve("delta"); + when(omSnapshotManager.getSnapshotLocalDataManager()).thenReturn(localDataManager); + when(activeMetadataManager.getStore()).thenReturn(rdbStore); + when(rdbStore.getRocksDBCheckpointDiffer()).thenReturn(differ); + } + + @AfterEach + public void tearDown() throws Exception { + if (mocks != null) { + mocks.close(); + } + } + + /** + * Tests that RDBDifferComputer is created when fullDiff=false using mockConstruction. + */ + @Test + public void testRDBDifferComputerCreatedWhenNotFullDiff() throws IOException { + try (MockedConstruction rdbDifferMock = mockConstruction(RDBDifferComputer.class); + MockedConstruction fullDiffMock = mockConstruction(FullDiffComputer.class)) { + + CompositeDeltaDiffComputer composite = new CompositeDeltaDiffComputer( + omSnapshotManager, activeMetadataManager, deltaDirPath, activityReporter, false, false); + + // Verify RDBDifferComputer was constructed (fullDiff=false) + assertEquals(1, rdbDifferMock.constructed().size(), "RDBDifferComputer should be constructed"); + assertEquals(1, fullDiffMock.constructed().size(), "FullDiffComputer should always be constructed"); + + composite.close(); + } + } + + /** + * Tests that RDBDifferComputer is NOT created when fullDiff=true using mockConstruction. + */ + @Test + public void testRDBDifferComputerNotCreatedWhenFullDiff() throws IOException { + try (MockedConstruction rdbDifferMock = mockConstruction(RDBDifferComputer.class); + MockedConstruction fullDiffMock = mockConstruction(FullDiffComputer.class)) { + + CompositeDeltaDiffComputer composite = new CompositeDeltaDiffComputer( + omSnapshotManager, activeMetadataManager, deltaDirPath, activityReporter, true, false); + + // Verify RDBDifferComputer was NOT constructed (fullDiff=true) + assertEquals(0, rdbDifferMock.constructed().size(), "RDBDifferComputer should NOT " + + "be constructed when fullDiff=true"); + assertEquals(1, fullDiffMock.constructed().size(), "FullDiffComputer should always be constructed"); + + composite.close(); + } + } + + /** + * Tests successful RDBDifferComputer computation without fallback. + */ + @Test + public void testSuccessfulRDBDifferComputationWithoutFallback() throws IOException { + UUID fromSnapshotId = UUID.randomUUID(); + UUID toSnapshotId = UUID.randomUUID(); + SnapshotInfo fromSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap1", fromSnapshotId); + SnapshotInfo toSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap2", toSnapshotId); + Set tablesToLookup = ImmutableSet.of("keyTable"); + TablePrefixInfo tablePrefixInfo = new TablePrefixInfo(ImmutableMap.of("keyTable", "a")); + + // Create expected results from RDBDiffer + Path sstFile1 = tempDir.resolve("rdb1.sst"); + Path sstFile2 = tempDir.resolve("rdb2.sst"); + Files.createFile(sstFile1); + Files.createFile(sstFile2); + SstFileInfo sstInfo1 = new SstFileInfo("rdb1.sst", "key1", "key2", "keyTable"); + SstFileInfo sstInfo2 = new SstFileInfo("rdb2.sst", "key3", "key4", "keyTable"); + Map> rdbDifferResult = new HashMap<>(); + rdbDifferResult.put(sstFile1, Pair.of(sstFile1, sstInfo1)); + rdbDifferResult.put(sstFile2, Pair.of(sstFile2, sstInfo2)); + + try (MockedConstruction rdbDifferMock = mockConstruction(RDBDifferComputer.class, + (mock, context) -> { + // Make RDBDifferComputer return results successfully + when(mock.computeDeltaFiles(any(), any(), anySet(), any())) + .thenReturn(Optional.of(rdbDifferResult)); + }); + MockedConstruction fullDiffMock = mockConstruction(FullDiffComputer.class)) { + + CompositeDeltaDiffComputer composite = new CompositeDeltaDiffComputer( + omSnapshotManager, activeMetadataManager, deltaDirPath, activityReporter, false, false); + + Optional>> result = + composite.computeDeltaFiles(fromSnapshot, toSnapshot, tablesToLookup, tablePrefixInfo); + + // Verify RDBDiffer results are returned + assertTrue(result.isPresent(), "Result should be present from RDBDiffer"); + assertEquals(2, result.get().size(), "Should have 2 files from RDBDiffer"); + assertEquals(rdbDifferResult, result.get(), "Should return RDBDifferComputer result"); + + // Verify RDBDifferComputer was called but NOT FullDiffComputer + RDBDifferComputer rdbDifferInstance = rdbDifferMock.constructed().get(0); + verify(rdbDifferInstance, times(1)).computeDeltaFiles(any(), any(), anySet(), any()); + + // Verify FullDiffComputer was NEVER called (no fallback needed) + FullDiffComputer fullDiffInstance = fullDiffMock.constructed().get(0); + verify(fullDiffInstance, times(0)).computeDeltaFiles(any(), any(), anySet(), any()); + + // Verify only DAG_WALK status was reported (no FULL_DIFF) + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(SubStatus.class); + verify(activityReporter, times(1)).accept(statusCaptor.capture()); + assertEquals(SubStatus.SST_FILE_DELTA_DAG_WALK, statusCaptor.getValue(), + "Only DAG_WALK should be reported when RDBDiffer succeeds"); + + composite.close(); + } + } + + /** + * Tests successful RDBDifferComputer with single file. + */ + @Test + public void testSuccessfulRDBDifferWithSingleFile() throws IOException { + UUID fromSnapshotId = UUID.randomUUID(); + UUID toSnapshotId = UUID.randomUUID(); + SnapshotInfo fromSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap1", fromSnapshotId); + SnapshotInfo toSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap2", toSnapshotId); + Set tablesToLookup = ImmutableSet.of("keyTable"); + TablePrefixInfo tablePrefixInfo = new TablePrefixInfo(ImmutableMap.of("keyTable", "a")); + + Path sstFile = tempDir.resolve("single.sst"); + Files.createFile(sstFile); + SstFileInfo sstInfo = new SstFileInfo("single.sst", "key1", "key5", "keyTable"); + Map> rdbDifferResult = new HashMap<>(); + rdbDifferResult.put(sstFile, Pair.of(sstFile, sstInfo)); + + try (MockedConstruction rdbDifferMock = mockConstruction(RDBDifferComputer.class, + (mock, context) -> { + when(mock.computeDeltaFiles(any(), any(), anySet(), any())) + .thenReturn(Optional.of(rdbDifferResult)); + }); + MockedConstruction fullDiffMock = mockConstruction(FullDiffComputer.class)) { + + CompositeDeltaDiffComputer composite = new CompositeDeltaDiffComputer( + omSnapshotManager, activeMetadataManager, deltaDirPath, activityReporter, false, false); + + Optional>> result = + composite.computeDeltaFiles(fromSnapshot, toSnapshot, tablesToLookup, tablePrefixInfo); + + assertTrue(result.isPresent(), "Result should be present"); + assertEquals(1, result.get().size(), "Should have 1 file"); + + // Verify no fallback to FullDiff + FullDiffComputer fullDiffInstance = fullDiffMock.constructed().get(0); + verify(fullDiffInstance, times(0)).computeDeltaFiles(any(), any(), anySet(), any()); + + composite.close(); + } + } + + /** + * Tests successful RDBDifferComputer with multiple tables. + */ + @Test + public void testSuccessfulRDBDifferWithMultipleTables() throws IOException { + UUID fromSnapshotId = UUID.randomUUID(); + UUID toSnapshotId = UUID.randomUUID(); + SnapshotInfo fromSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap1", fromSnapshotId); + SnapshotInfo toSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap2", toSnapshotId); + Set tablesToLookup = ImmutableSet.of("keyTable", "fileTable", "directoryTable"); + TablePrefixInfo tablePrefixInfo = new TablePrefixInfo(ImmutableMap.of( + "keyTable", "a", "fileTable", "b", "directoryTable", "c")); + + // Create files for different tables + Path keyFile = tempDir.resolve("key1.sst"); + Path fileFile = tempDir.resolve("file1.sst"); + Path dirFile = tempDir.resolve("dir1.sst"); + Files.createFile(keyFile); + Files.createFile(fileFile); + Files.createFile(dirFile); + + SstFileInfo keyInfo = new SstFileInfo("key1.sst", "key1", "key2", "keyTable"); + SstFileInfo fileInfo = new SstFileInfo("file1.sst", "file1", "file2", "fileTable"); + SstFileInfo dirInfo = new SstFileInfo("dir1.sst", "dir1", "dir2", "directoryTable"); + + Map> rdbDifferResult = new HashMap<>(); + rdbDifferResult.put(keyFile, Pair.of(keyFile, keyInfo)); + rdbDifferResult.put(fileFile, Pair.of(fileFile, fileInfo)); + rdbDifferResult.put(dirFile, Pair.of(dirFile, dirInfo)); + + try (MockedConstruction rdbDifferMock = mockConstruction(RDBDifferComputer.class, + (mock, context) -> { + when(mock.computeDeltaFiles(any(), any(), anySet(), any())) + .thenReturn(Optional.of(rdbDifferResult)); + }); + MockedConstruction fullDiffMock = mockConstruction(FullDiffComputer.class)) { + + CompositeDeltaDiffComputer composite = new CompositeDeltaDiffComputer( + omSnapshotManager, activeMetadataManager, deltaDirPath, activityReporter, false, false); + + Optional>> result = + composite.computeDeltaFiles(fromSnapshot, toSnapshot, tablesToLookup, tablePrefixInfo); + + assertTrue(result.isPresent(), "Result should be present"); + assertEquals(3, result.get().size(), "Should have 3 files from different tables"); + + // Verify RDBDiffer handled all tables without fallback + RDBDifferComputer rdbDifferInstance = rdbDifferMock.constructed().get(0); + verify(rdbDifferInstance, times(1)).computeDeltaFiles(any(), any(), anySet(), any()); + + FullDiffComputer fullDiffInstance = fullDiffMock.constructed().get(0); + verify(fullDiffInstance, times(0)).computeDeltaFiles(any(), any(), anySet(), any()); + + composite.close(); + } + } + + /** + * Tests successful RDBDifferComputer returning empty map (no changes). + */ + @Test + public void testSuccessfulRDBDifferWithNoChanges() throws IOException { + UUID fromSnapshotId = UUID.randomUUID(); + UUID toSnapshotId = UUID.randomUUID(); + SnapshotInfo fromSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap1", fromSnapshotId); + SnapshotInfo toSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap2", toSnapshotId); + Set tablesToLookup = ImmutableSet.of("keyTable"); + TablePrefixInfo tablePrefixInfo = new TablePrefixInfo(ImmutableMap.of("keyTable", "a")); + + // RDBDiffer returns empty map (no differences, but successful computation) + Map> emptyResult = new HashMap<>(); + + try (MockedConstruction rdbDifferMock = mockConstruction(RDBDifferComputer.class, + (mock, context) -> { + when(mock.computeDeltaFiles(any(), any(), anySet(), any())) + .thenReturn(Optional.of(emptyResult)); + }); + MockedConstruction fullDiffMock = mockConstruction(FullDiffComputer.class)) { + + CompositeDeltaDiffComputer composite = new CompositeDeltaDiffComputer( + omSnapshotManager, activeMetadataManager, deltaDirPath, activityReporter, false, false); + + Optional>> result = + composite.computeDeltaFiles(fromSnapshot, toSnapshot, tablesToLookup, tablePrefixInfo); + + // Empty result is still a valid success case - no fallback needed + assertTrue(result.isPresent(), "Result should be present even if empty"); + assertEquals(0, result.get().size(), "Should have 0 files (no changes)"); + + // Verify no fallback occurred + FullDiffComputer fullDiffInstance = fullDiffMock.constructed().get(0); + verify(fullDiffInstance, times(0)).computeDeltaFiles(any(), any(), anySet(), any()); + + // Only DAG_WALK status should be reported + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(SubStatus.class); + verify(activityReporter, times(1)).accept(statusCaptor.capture()); + assertEquals(SubStatus.SST_FILE_DELTA_DAG_WALK, statusCaptor.getValue()); + + composite.close(); + } + } + + /** + * Tests fallback from RDBDifferComputer to FullDiffComputer using mockConstruction. + */ + @Test + public void testFallbackFromRDBDifferToFullDiff() throws IOException { + UUID fromSnapshotId = UUID.randomUUID(); + UUID toSnapshotId = UUID.randomUUID(); + SnapshotInfo fromSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap1", fromSnapshotId); + SnapshotInfo toSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap2", toSnapshotId); + Set tablesToLookup = ImmutableSet.of("keyTable"); + TablePrefixInfo tablePrefixInfo = new TablePrefixInfo(ImmutableMap.of("keyTable", "a")); + + // Create expected results + Path sstFile = tempDir.resolve("test.sst"); + Files.createFile(sstFile); + SstFileInfo sstInfo = new SstFileInfo("test.sst", "key1", "key2", "keyTable"); + Map> fullDiffResult = new HashMap<>(); + fullDiffResult.put(sstFile, Pair.of(sstFile, sstInfo)); + + try (MockedConstruction rdbDifferMock = mockConstruction(RDBDifferComputer.class, + (mock, context) -> { + // Make RDBDifferComputer return empty to trigger fallback + when(mock.computeDeltaFiles(any(), any(), anySet(), any())) + .thenReturn(Optional.empty()); + }); + MockedConstruction fullDiffMock = mockConstruction(FullDiffComputer.class, + (mock, context) -> { + // Make FullDiffComputer return results + when(mock.computeDeltaFiles(any(), any(), anySet(), any())) + .thenReturn(Optional.of(fullDiffResult)); + })) { + + CompositeDeltaDiffComputer composite = new CompositeDeltaDiffComputer( + omSnapshotManager, activeMetadataManager, deltaDirPath, activityReporter, false, false); + + Optional>> result = + composite.computeDeltaFiles(fromSnapshot, toSnapshot, tablesToLookup, tablePrefixInfo); + + // Verify fallback occurred + assertTrue(result.isPresent(), "Result should be present from fallback"); + assertEquals(fullDiffResult, result.get(), "Should return FullDiffComputer result"); + + // Verify both computers were called + RDBDifferComputer rdbDifferInstance = rdbDifferMock.constructed().get(0); + FullDiffComputer fullDiffInstance = fullDiffMock.constructed().get(0); + + verify(rdbDifferInstance, times(1)).computeDeltaFiles(any(), any(), anySet(), any()); + verify(fullDiffInstance, times(1)).computeDeltaFiles(any(), any(), anySet(), any()); + + // Verify activity statuses were reported + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(SubStatus.class); + verify(activityReporter, times(2)).accept(statusCaptor.capture()); + List statuses = statusCaptor.getAllValues(); + assertEquals(SubStatus.SST_FILE_DELTA_DAG_WALK, statuses.get(0)); + assertEquals(SubStatus.SST_FILE_DELTA_FULL_DIFF, statuses.get(1)); + + composite.close(); + } + } + + /** + * Tests fallback on exception using mockConstruction. + */ + @Test + public void testFallbackOnException() throws IOException { + UUID fromSnapshotId = UUID.randomUUID(); + UUID toSnapshotId = UUID.randomUUID(); + SnapshotInfo fromSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap1", fromSnapshotId); + SnapshotInfo toSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap2", toSnapshotId); + Set tablesToLookup = ImmutableSet.of("keyTable"); + TablePrefixInfo tablePrefixInfo = new TablePrefixInfo(ImmutableMap.of("keyTable", "a")); + + Path sstFile = tempDir.resolve("test2.sst"); + Files.createFile(sstFile); + SstFileInfo sstInfo = new SstFileInfo("test2.sst", "key3", "key4", "keyTable"); + Map> fullDiffResult = new HashMap<>(); + fullDiffResult.put(sstFile, Pair.of(sstFile, sstInfo)); + + try (MockedConstruction rdbDifferMock = mockConstruction(RDBDifferComputer.class, + (mock, context) -> { + // Make RDBDifferComputer throw exception to trigger fallback + when(mock.computeDeltaFiles(any(), any(), anySet(), any())) + .thenThrow(new RuntimeException("Test exception")); + }); + MockedConstruction fullDiffMock = mockConstruction(FullDiffComputer.class, + (mock, context) -> { + // Make FullDiffComputer return results + when(mock.computeDeltaFiles(any(), any(), anySet(), any())) + .thenReturn(Optional.of(fullDiffResult)); + })) { + + CompositeDeltaDiffComputer composite = new CompositeDeltaDiffComputer( + omSnapshotManager, activeMetadataManager, deltaDirPath, activityReporter, false, false); + + Optional>> result = + composite.computeDeltaFiles(fromSnapshot, toSnapshot, tablesToLookup, tablePrefixInfo); + + // Verify fallback occurred + assertTrue(result.isPresent(), "Result should be present from fallback after exception"); + + // Verify activity statuses + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(SubStatus.class); + verify(activityReporter, times(2)).accept(statusCaptor.capture()); + List statuses = statusCaptor.getAllValues(); + assertEquals(SubStatus.SST_FILE_DELTA_DAG_WALK, statuses.get(0)); + assertEquals(SubStatus.SST_FILE_DELTA_FULL_DIFF, statuses.get(1)); + + composite.close(); + } + } + + /** + * Tests that FullDiffComputer is used directly when fullDiff=true. + */ + @Test + public void testFullDiffOnlyMode() throws IOException { + UUID fromSnapshotId = UUID.randomUUID(); + UUID toSnapshotId = UUID.randomUUID(); + SnapshotInfo fromSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap1", fromSnapshotId); + SnapshotInfo toSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap2", toSnapshotId); + Set tablesToLookup = ImmutableSet.of("keyTable"); + TablePrefixInfo tablePrefixInfo = new TablePrefixInfo(ImmutableMap.of("keyTable", "a")); + + Path sstFile = tempDir.resolve("test3.sst"); + Files.createFile(sstFile); + SstFileInfo sstInfo = new SstFileInfo("test3.sst", "key5", "key6", "keyTable"); + Map> fullDiffResult = new HashMap<>(); + fullDiffResult.put(sstFile, Pair.of(sstFile, sstInfo)); + + try (MockedConstruction rdbDifferMock = mockConstruction(RDBDifferComputer.class); + MockedConstruction fullDiffMock = mockConstruction(FullDiffComputer.class, + (mock, context) -> { + when(mock.computeDeltaFiles(any(), any(), anySet(), any())) + .thenReturn(Optional.of(fullDiffResult)); + })) { + + CompositeDeltaDiffComputer composite = new CompositeDeltaDiffComputer( + omSnapshotManager, activeMetadataManager, deltaDirPath, activityReporter, true, false); + + Optional>> result = + composite.computeDeltaFiles(fromSnapshot, toSnapshot, tablesToLookup, tablePrefixInfo); + + // Verify RDBDifferComputer was never constructed or called + assertEquals(0, rdbDifferMock.constructed().size(), "RDBDifferComputer should not be constructed"); + + // Verify FullDiffComputer was used + assertTrue(result.isPresent(), "Result should be present"); + FullDiffComputer fullDiffInstance = fullDiffMock.constructed().get(0); + verify(fullDiffInstance, times(1)).computeDeltaFiles(any(), any(), anySet(), any()); + + // Verify only FULL_DIFF status was reported + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(SubStatus.class); + verify(activityReporter, times(1)).accept(statusCaptor.capture()); + assertEquals(SubStatus.SST_FILE_DELTA_FULL_DIFF, statusCaptor.getValue()); + + composite.close(); + } + } + + /** + * Tests proper cleanup of both computers. + */ + @Test + public void testCloseCallsBothComputers() throws IOException { + try (MockedConstruction rdbDifferMock = mockConstruction(RDBDifferComputer.class); + MockedConstruction fullDiffMock = mockConstruction(FullDiffComputer.class)) { + + CompositeDeltaDiffComputer composite = new CompositeDeltaDiffComputer( + omSnapshotManager, activeMetadataManager, deltaDirPath, activityReporter, false, false); + + composite.close(); + + // Verify close was called on both + RDBDifferComputer rdbDifferInstance = rdbDifferMock.constructed().get(0); + FullDiffComputer fullDiffInstance = fullDiffMock.constructed().get(0); + + verify(rdbDifferInstance, times(1)).close(); + verify(fullDiffInstance, times(1)).close(); + } + } + + /** + * Tests that nonNativeDiff flag is properly passed to constructor. + * Verifies CompositeDeltaDiffComputer can be created with nonNativeDiff=true. + */ + @Test + public void testNonNativeDiffFlagInConstructor() throws IOException { + try (MockedConstruction rdbDifferMock = mockConstruction(RDBDifferComputer.class); + MockedConstruction fullDiffMock = mockConstruction(FullDiffComputer.class)) { + + // Create with nonNativeDiff = true + CompositeDeltaDiffComputer composite = new CompositeDeltaDiffComputer( + omSnapshotManager, activeMetadataManager, deltaDirPath, activityReporter, false, true); + + // Verify construction succeeds and both computers are created + assertEquals(1, rdbDifferMock.constructed().size(), "RDBDifferComputer should be created"); + assertEquals(1, fullDiffMock.constructed().size(), "FullDiffComputer should be created"); + + composite.close(); + } + } + + /** + * Tests that nonNativeDiff flag works correctly when disabled. + * Verifies CompositeDeltaDiffComputer can be created with nonNativeDiff=false. + */ + @Test + public void testNonNativeDiffDisabled() throws IOException { + try (MockedConstruction rdbDifferMock = mockConstruction(RDBDifferComputer.class); + MockedConstruction fullDiffMock = mockConstruction(FullDiffComputer.class)) { + + // Create with nonNativeDiff = false (default behavior) + CompositeDeltaDiffComputer composite = new CompositeDeltaDiffComputer( + omSnapshotManager, activeMetadataManager, deltaDirPath, activityReporter, false, false); + + // Verify construction succeeds and both computers are created + assertEquals(1, rdbDifferMock.constructed().size(), "RDBDifferComputer should be created"); + assertEquals(1, fullDiffMock.constructed().size(), "FullDiffComputer should be created"); + + composite.close(); + } + } + + /** + * Tests nonNativeDiff mode with computeDeltaFiles - verifies fromSnapshot files are added. + * In nonNativeDiff mode, SST files from fromSnapshot are added to the delta to handle deletes. + */ + @Test + public void testNonNativeDiffComputeDeltaFilesEnabled() throws IOException { + // Given nonNativeDiff is enabled and we have snapshots + UUID fromSnapshotId = UUID.randomUUID(); + UUID toSnapshotId = UUID.randomUUID(); + SnapshotInfo fromSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap1", fromSnapshotId); + SnapshotInfo toSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap2", toSnapshotId); + Set tablesToLookup = ImmutableSet.of("keyTable"); + TablePrefixInfo tablePrefixInfo = new TablePrefixInfo(ImmutableMap.of("keyTable", "a")); + + // Setup fromSnapshot SST files + Path fromDbPath = tempDir.resolve("fromDb"); + Files.createDirectories(fromDbPath); + Path fromSstFile1 = fromDbPath.resolve("000001.sst"); + Path fromSstFile2 = fromDbPath.resolve("000002.sst"); + Files.createFile(fromSstFile1); + Files.createFile(fromSstFile2); + + SstFileInfo fromSstInfo1 = new SstFileInfo("000001", "a/key1", "a/key100", "keyTable"); + SstFileInfo fromSstInfo2 = new SstFileInfo("000002", "a/key101", "a/key200", "keyTable"); + Set fromSnapshotSstFiles = ImmutableSet.of(fromSstInfo1, fromSstInfo2); + + // Mock fromSnapshot + OmSnapshot fromSnap = org.mockito.Mockito.mock(OmSnapshot.class); + OMMetadataManager fromMetaMgr = org.mockito.Mockito.mock(OMMetadataManager.class); + RDBStore fromRdbStore = org.mockito.Mockito.mock(RDBStore.class); + when(fromSnap.getMetadataManager()).thenReturn(fromMetaMgr); + when(fromMetaMgr.getStore()).thenReturn(fromRdbStore); + when(fromRdbStore.getDbLocation()).thenReturn(fromDbPath.toFile()); + + @SuppressWarnings("unchecked") + UncheckedAutoCloseableSupplier fromSnapSupplier = + (UncheckedAutoCloseableSupplier) org.mockito.Mockito.mock(UncheckedAutoCloseableSupplier.class); + when(fromSnapSupplier.get()).thenReturn(fromSnap); + when(omSnapshotManager.getActiveSnapshot(eq("vol1"), eq("bucket1"), eq("snap1"))) + .thenReturn(fromSnapSupplier); + + // Mock RDBDifferComputer to return a result + Map> rdbDifferResult = new HashMap<>(); + Path toSstFile = tempDir.resolve("000003.sst"); + Files.createFile(toSstFile); + SstFileInfo toSstInfo = new SstFileInfo("000003.sst", "a/key1", "a/key50", "keyTable"); + rdbDifferResult.put(toSstFile, Pair.of(deltaDirPath.resolve("000003.sst"), toSstInfo)); + + try (MockedConstruction rdbDifferMock = mockConstruction(RDBDifferComputer.class, + (mock, context) -> { + when(mock.computeDeltaFiles(any(), any(), anySet(), any())) + .thenReturn(Optional.of(rdbDifferResult)); + }); + MockedConstruction fullDiffMock = mockConstruction(FullDiffComputer.class); + MockedStatic fullDiffStaticMock = mockStatic(FullDiffComputer.class)) { + + // Mock the static method getSSTFileSetForSnapshot + fullDiffStaticMock.when(() -> FullDiffComputer.getSSTFileSetForSnapshot(any(), anySet(), any())) + .thenReturn(fromSnapshotSstFiles); + + // When we create CompositeDeltaDiffComputer with nonNativeDiff=true + CompositeDeltaDiffComputer composite = new CompositeDeltaDiffComputer( + omSnapshotManager, activeMetadataManager, deltaDirPath, activityReporter, false, true); + + // Then computeDeltaFiles should complete successfully and include fromSnapshot files + Optional>> result = + composite.computeDeltaFiles(fromSnapshot, toSnapshot, tablesToLookup, tablePrefixInfo); + + // Result should be present with both RDBDiffer result AND fromSnapshot files + assertTrue(result.isPresent(), "Result should be present"); + Map> deltaFiles = result.get(); + + // Should have 1 from RDBDiffer + 2 from fromSnapshot = 3 total + assertEquals(3, deltaFiles.size(), + "Should have 3 files (1 RDBDiffer + 2 fromSnapshot), got: " + deltaFiles.size()); + assertEquals(ImmutableSet.of(fromSstFile1, fromSstFile2, toSstFile), deltaFiles.keySet()); + Map infoMap = ImmutableMap.of(fromSstFile1, fromSstInfo1, fromSstFile2, fromSstInfo2, + toSstFile, toSstInfo); + for (Map.Entry> entry : deltaFiles.entrySet()) { + assertEquals(infoMap.get(entry.getKey()), entry.getValue().getRight()); + assertEquals(deltaDirPath.toAbsolutePath(), entry.getValue().getLeft().toAbsolutePath().getParent()); + } + assertEquals(getINode(fromSstFile1), getINode(deltaFiles.get(fromSstFile1).getLeft())); + assertEquals(getINode(fromSstFile2), getINode(deltaFiles.get(fromSstFile2).getLeft())); + + composite.close(); + } + } + + /** + * Tests nonNativeDiff mode disabled with computeDeltaFiles. + * Verifies normal behavior when nonNativeDiff=false. + */ + @Test + public void testNonNativeDiffComputeDeltaFilesDisabled() throws IOException { + // Given nonNativeDiff is disabled + UUID fromSnapshotId = UUID.randomUUID(); + UUID toSnapshotId = UUID.randomUUID(); + SnapshotInfo fromSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap1", fromSnapshotId); + SnapshotInfo toSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap2", toSnapshotId); + Set tablesToLookup = ImmutableSet.of("keyTable"); + TablePrefixInfo tablePrefixInfo = new TablePrefixInfo(ImmutableMap.of("keyTable", "a")); + + // Mock RDBDifferComputer to return a result + Map> rdbDifferResult = new HashMap<>(); + Path sstFile = tempDir.resolve("000001.sst"); + Files.createFile(sstFile); + SstFileInfo sstInfo = new SstFileInfo("000001.sst", "a/key1", "a/key50", "keyTable"); + rdbDifferResult.put(sstFile, Pair.of(deltaDirPath.resolve("000001.sst"), sstInfo)); + + try (MockedConstruction rdbDifferMock = mockConstruction(RDBDifferComputer.class, + (mock, context) -> { + when(mock.computeDeltaFiles(any(), any(), anySet(), any())) + .thenReturn(Optional.of(rdbDifferResult)); + }); + MockedConstruction fullDiffMock = mockConstruction(FullDiffComputer.class)) { + + // When we create CompositeDeltaDiffComputer with nonNativeDiff=false + CompositeDeltaDiffComputer composite = new CompositeDeltaDiffComputer( + omSnapshotManager, activeMetadataManager, deltaDirPath, activityReporter, false, false); + + // Then computeDeltaFiles should complete successfully with RDBDiffer result + Optional>> result = + composite.computeDeltaFiles(fromSnapshot, toSnapshot, tablesToLookup, tablePrefixInfo); + + // Result should contain RDBDiffer result + assertTrue(result.isPresent(), "Result should be present"); + Map> deltaFiles = result.get(); + assertEquals(1, deltaFiles.size(), "Should have RDBDiffer result"); + assertTrue(deltaFiles.containsKey(sstFile), "Should contain the SST file"); + + composite.close(); + } + } + + // Helper methods + + private SnapshotInfo createMockSnapshotInfo(String volumeName, String bucketName, + String snapshotName, UUID snapshotId) { + SnapshotInfo.Builder builder = SnapshotInfo.newBuilder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setName(snapshotName) + .setSnapshotId(snapshotId) + .setDbTxSequenceNumber(100L); + return builder.build(); + } +} + diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/TestRDBDifferComputer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/TestRDBDifferComputer.java new file mode 100644 index 000000000000..b4ba058a43c2 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/TestRDBDifferComputer.java @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot.diff.delta; + +import static org.apache.hadoop.hdds.utils.IOUtils.getINode; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; +import java.util.function.Consumer; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.TablePrefixInfo; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OmSnapshotLocalData; +import org.apache.hadoop.ozone.om.OmSnapshotLocalData.VersionMeta; +import org.apache.hadoop.ozone.om.OmSnapshotManager; +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider; +import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus; +import org.apache.ozone.rocksdb.util.SstFileInfo; +import org.apache.ozone.rocksdiff.DifferSnapshotInfo; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Unit tests for RDBDifferComputer. + */ +public class TestRDBDifferComputer { + + @TempDir + private Path tempDir; + + @Mock + private OmSnapshotManager omSnapshotManager; + + @Mock + private OMMetadataManager activeMetadataManager; + + @Mock + private OmSnapshotLocalDataManager localDataManager; + + @Mock + private RDBStore rdbStore; + + @Mock + private RocksDBCheckpointDiffer differ; + + @Mock + private Consumer activityReporter; + + private AutoCloseable mocks; + private Path deltaDirPath; + private RDBDifferComputer rdbDifferComputer; + + @BeforeEach + public void setUp() throws IOException { + mocks = MockitoAnnotations.openMocks(this); + deltaDirPath = tempDir.resolve("delta"); + when(omSnapshotManager.getSnapshotLocalDataManager()).thenReturn(localDataManager); + when(activeMetadataManager.getStore()).thenReturn(rdbStore); + when(rdbStore.getRocksDBCheckpointDiffer()).thenReturn(differ); + } + + @AfterEach + public void tearDown() throws Exception { + if (rdbDifferComputer != null) { + rdbDifferComputer.close(); + } + if (mocks != null) { + mocks.close(); + } + } + + /** + * Tests that the constructor creates RDBDifferComputer successfully with differ. + */ + @Test + public void testConstructorWithDiffer() throws IOException { + rdbDifferComputer = new RDBDifferComputer(omSnapshotManager, activeMetadataManager, + deltaDirPath, activityReporter); + + assertNotNull(rdbDifferComputer, "RDBDifferComputer should be created"); + assertTrue(Files.exists(deltaDirPath), "Delta directory should be created"); + verify(activeMetadataManager, times(1)).getStore(); + verify(rdbStore, times(1)).getRocksDBCheckpointDiffer(); + } + + /** + * Tests constructor when differ is null (fallback scenario). + */ + @Test + public void testConstructorWithNullDiffer() throws IOException { + when(rdbStore.getRocksDBCheckpointDiffer()).thenReturn(null); + + rdbDifferComputer = new RDBDifferComputer(omSnapshotManager, activeMetadataManager, + deltaDirPath, activityReporter); + + assertNotNull(rdbDifferComputer, "RDBDifferComputer should be created even with null differ"); + assertTrue(Files.exists(deltaDirPath), "Delta directory should be created"); + } + + /** + * Tests computeDeltaFiles with successful differ computation. + */ + @Test + public void testComputeDeltaFilesWithDiffer() throws IOException { + rdbDifferComputer = new RDBDifferComputer(omSnapshotManager, activeMetadataManager, + deltaDirPath, activityReporter); + + UUID fromSnapshotId = UUID.randomUUID(); + UUID toSnapshotId = UUID.randomUUID(); + SnapshotInfo fromSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap1", fromSnapshotId); + SnapshotInfo toSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap2", toSnapshotId); + Set tablesToLookup = ImmutableSet.of("keyTable"); + TablePrefixInfo tablePrefixInfo = mock(TablePrefixInfo.class); + + // Mock snapshot local data + ReadableOmSnapshotLocalDataProvider snapProvider = mock(ReadableOmSnapshotLocalDataProvider.class); + OmSnapshotLocalData fromSnapshotLocalData = createMockSnapshotLocalData(fromSnapshotId, 1); + OmSnapshotLocalData toSnapshotLocalData = createMockSnapshotLocalData(toSnapshotId, 2); + + when(snapProvider.getPreviousSnapshotLocalData()).thenReturn(fromSnapshotLocalData); + when(snapProvider.getSnapshotLocalData()).thenReturn(toSnapshotLocalData); + when(localDataManager.getOmSnapshotLocalData(toSnapshotId, fromSnapshotId)).thenReturn(snapProvider); + + // Create mock SST files + Path sstFile1 = tempDir.resolve("sst1.sst"); + Path sstFile2 = tempDir.resolve("sst2.sst"); + Files.createFile(sstFile1); + Files.createFile(sstFile2); + + SstFileInfo sstFileInfo1 = new SstFileInfo("sst1.sst", "key1", "key2", "keyTable"); + SstFileInfo sstFileInfo2 = new SstFileInfo("sst2.sst", "key3", "key4", "keyTable"); + + Map differResult = new HashMap<>(); + differResult.put(sstFile1, sstFileInfo1); + differResult.put(sstFile2, sstFileInfo2); + + when(differ.getSSTDiffListWithFullPath(any(DifferSnapshotInfo.class), any(DifferSnapshotInfo.class), + any(Map.class), any(TablePrefixInfo.class), anySet())).thenReturn(Optional.of(differResult)); + + Optional>> result = + rdbDifferComputer.computeDeltaFiles(fromSnapshot, toSnapshot, tablesToLookup, tablePrefixInfo); + + assertTrue(result.isPresent(), "Result should be present"); + assertEquals(2, result.get().size(), "Should have 2 delta files"); + assertTrue(result.get().containsKey(sstFile1), "Should contain first SST file"); + assertTrue(result.get().containsKey(sstFile2), "Should contain second SST file"); + + // Verify links were created in delta directory + for (Map.Entry> entry : result.get().entrySet()) { + Path actualPath = entry.getKey(); + Path link = entry.getValue().getLeft(); + assertEquals(differResult.get(actualPath), entry.getValue().getValue()); + assertTrue(link.startsWith(deltaDirPath), "Link should be in delta directory"); + assertTrue(Files.exists(link), "Link should exist"); + assertEquals(getINode(actualPath), getINode(link)); + } + + verify(snapProvider, times(1)).close(); + } + + /** + * Tests computeDeltaFiles when differ returns empty. + */ + @Test + public void testComputeDeltaFilesWithEmptyDifferResult() throws IOException { + rdbDifferComputer = new RDBDifferComputer(omSnapshotManager, activeMetadataManager, + deltaDirPath, activityReporter); + + UUID fromSnapshotId = UUID.randomUUID(); + UUID toSnapshotId = UUID.randomUUID(); + SnapshotInfo fromSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap1", fromSnapshotId); + SnapshotInfo toSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap2", toSnapshotId); + Set tablesToLookup = ImmutableSet.of("keyTable"); + TablePrefixInfo tablePrefixInfo = mock(TablePrefixInfo.class); + + // Mock snapshot local data + ReadableOmSnapshotLocalDataProvider snapProvider = mock(ReadableOmSnapshotLocalDataProvider.class); + OmSnapshotLocalData fromSnapshotLocalData = createMockSnapshotLocalData(fromSnapshotId, 1); + OmSnapshotLocalData toSnapshotLocalData = createMockSnapshotLocalData(toSnapshotId, 2); + + when(snapProvider.getPreviousSnapshotLocalData()).thenReturn(fromSnapshotLocalData); + when(snapProvider.getSnapshotLocalData()).thenReturn(toSnapshotLocalData); + when(localDataManager.getOmSnapshotLocalData(toSnapshotId, fromSnapshotId)).thenReturn(snapProvider); + + when(differ.getSSTDiffListWithFullPath(any(DifferSnapshotInfo.class), any(DifferSnapshotInfo.class), + any(Map.class), any(TablePrefixInfo.class), anySet())).thenReturn(Optional.empty()); + + Optional>> result = + rdbDifferComputer.computeDeltaFiles(fromSnapshot, toSnapshot, tablesToLookup, tablePrefixInfo); + + assertFalse(result.isPresent(), "Result should be empty when differ returns empty"); + verify(snapProvider, times(1)).close(); + } + + /** + * Tests computeDeltaFiles when differ is null. + */ + @Test + public void testComputeDeltaFilesWithNullDiffer() throws IOException { + when(rdbStore.getRocksDBCheckpointDiffer()).thenReturn(null); + rdbDifferComputer = new RDBDifferComputer(omSnapshotManager, activeMetadataManager, + deltaDirPath, activityReporter); + + SnapshotInfo fromSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap1", UUID.randomUUID()); + SnapshotInfo toSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap2", UUID.randomUUID()); + Set tablesToLookup = ImmutableSet.of("keyTable"); + TablePrefixInfo tablePrefixInfo = mock(TablePrefixInfo.class); + + Optional>> result = + rdbDifferComputer.computeDeltaFiles(fromSnapshot, toSnapshot, tablesToLookup, tablePrefixInfo); + + assertFalse(result.isPresent(), "Result should be empty when differ is null"); + } + + /** + * Tests computeDeltaFiles with multiple tables. + */ + @Test + public void testComputeDeltaFilesWithMultipleTables() throws IOException { + rdbDifferComputer = new RDBDifferComputer(omSnapshotManager, activeMetadataManager, + deltaDirPath, activityReporter); + + UUID fromSnapshotId = UUID.randomUUID(); + UUID toSnapshotId = UUID.randomUUID(); + SnapshotInfo fromSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap1", fromSnapshotId); + SnapshotInfo toSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap2", toSnapshotId); + Set tablesToLookup = ImmutableSet.of("keyTable", "fileTable", "directoryTable"); + TablePrefixInfo tablePrefixInfo = mock(TablePrefixInfo.class); + + // Mock snapshot local data + ReadableOmSnapshotLocalDataProvider snapProvider = mock(ReadableOmSnapshotLocalDataProvider.class); + OmSnapshotLocalData fromSnapshotLocalData = createMockSnapshotLocalData(fromSnapshotId, 1); + OmSnapshotLocalData toSnapshotLocalData = createMockSnapshotLocalData(toSnapshotId, 2); + + when(snapProvider.getPreviousSnapshotLocalData()).thenReturn(fromSnapshotLocalData); + when(snapProvider.getSnapshotLocalData()).thenReturn(toSnapshotLocalData); + when(localDataManager.getOmSnapshotLocalData(toSnapshotId, fromSnapshotId)).thenReturn(snapProvider); + + // Create mock SST files for different tables + Path sstFile1 = tempDir.resolve("key1.sst"); + Path sstFile2 = tempDir.resolve("file1.sst"); + Path sstFile3 = tempDir.resolve("dir1.sst"); + Files.createFile(sstFile1); + Files.createFile(sstFile2); + Files.createFile(sstFile3); + + SstFileInfo sstFileInfo1 = new SstFileInfo("key1.sst", "key1", "key2", "keyTable"); + SstFileInfo sstFileInfo2 = new SstFileInfo("file1.sst", "file1", "file2", "fileTable"); + SstFileInfo sstFileInfo3 = new SstFileInfo("dir1.sst", "dir1", "dir2", "directoryTable"); + + Map differResult = new HashMap<>(); + differResult.put(sstFile1, sstFileInfo1); + differResult.put(sstFile2, sstFileInfo2); + differResult.put(sstFile3, sstFileInfo3); + + when(differ.getSSTDiffListWithFullPath(any(DifferSnapshotInfo.class), any(DifferSnapshotInfo.class), + any(Map.class), any(TablePrefixInfo.class), anySet())).thenReturn(Optional.of(differResult)); + + Optional>> result = + rdbDifferComputer.computeDeltaFiles(fromSnapshot, toSnapshot, tablesToLookup, tablePrefixInfo); + + assertTrue(result.isPresent(), "Result should be present"); + assertEquals(3, result.get().size(), "Should have 3 delta files from different tables"); + } + + /** + * Tests computeDeltaFiles with version mapping. + */ + @Test + public void testComputeDeltaFilesWithVersionMapping() throws IOException { + rdbDifferComputer = new RDBDifferComputer(omSnapshotManager, activeMetadataManager, + deltaDirPath, activityReporter); + + UUID fromSnapshotId = UUID.randomUUID(); + UUID toSnapshotId = UUID.randomUUID(); + SnapshotInfo fromSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap1", fromSnapshotId); + SnapshotInfo toSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap2", toSnapshotId); + Set tablesToLookup = ImmutableSet.of("keyTable"); + TablePrefixInfo tablePrefixInfo = mock(TablePrefixInfo.class); + + // Mock snapshot local data with version mapping + ReadableOmSnapshotLocalDataProvider snapProvider = mock(ReadableOmSnapshotLocalDataProvider.class); + OmSnapshotLocalData fromSnapshotLocalData = createMockSnapshotLocalData(fromSnapshotId, 1); + OmSnapshotLocalData toSnapshotLocalData = createMockSnapshotLocalDataWithVersions(toSnapshotId, 2); + + when(snapProvider.getPreviousSnapshotLocalData()).thenReturn(fromSnapshotLocalData); + when(snapProvider.getSnapshotLocalData()).thenReturn(toSnapshotLocalData); + when(localDataManager.getOmSnapshotLocalData(toSnapshotId, fromSnapshotId)).thenReturn(snapProvider); + + Path sstFile = tempDir.resolve("sst1.sst"); + Files.createFile(sstFile); + SstFileInfo sstFileInfo = new SstFileInfo("sst1.sst", "key1", "key2", "keyTable"); + + Map differResult = new HashMap<>(); + differResult.put(sstFile, sstFileInfo); + + when(differ.getSSTDiffListWithFullPath(any(DifferSnapshotInfo.class), any(DifferSnapshotInfo.class), + any(Map.class), any(TablePrefixInfo.class), anySet())).thenReturn(Optional.of(differResult)); + + Optional>> result = + rdbDifferComputer.computeDeltaFiles(fromSnapshot, toSnapshot, tablesToLookup, tablePrefixInfo); + + assertTrue(result.isPresent(), "Result should be present"); + + // Verify that version map was passed to differ + ArgumentCaptor> versionMapCaptor = ArgumentCaptor.forClass(Map.class); + verify(differ).getSSTDiffListWithFullPath(any(DifferSnapshotInfo.class), any(DifferSnapshotInfo.class), + versionMapCaptor.capture(), any(TablePrefixInfo.class), anySet()); + + Map capturedVersionMap = versionMapCaptor.getValue(); + assertNotNull(capturedVersionMap, "Version map should not be null"); + assertEquals(ImmutableMap.of(0, 0, 1, 0, 2, 1), capturedVersionMap); + } + + /** + * Tests that getDSIFromSI throws exception when no versions found. + */ + @Test + public void testGetDSIFromSIWithNoVersions() throws IOException { + rdbDifferComputer = new RDBDifferComputer(omSnapshotManager, activeMetadataManager, + deltaDirPath, activityReporter); + + UUID snapshotId = UUID.randomUUID(); + SnapshotInfo fromSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap1", snapshotId); + SnapshotInfo toSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap2", UUID.randomUUID()); + Set tablesToLookup = ImmutableSet.of("keyTable"); + TablePrefixInfo tablePrefixInfo = mock(TablePrefixInfo.class); + + // Mock snapshot local data with empty versions + ReadableOmSnapshotLocalDataProvider snapProvider = mock(ReadableOmSnapshotLocalDataProvider.class); + OmSnapshotLocalData fromSnapshotLocalData = mock(OmSnapshotLocalData.class); + OmSnapshotLocalData toSnapshotLocalData = createMockSnapshotLocalData(UUID.randomUUID(), 1); + + when(fromSnapshotLocalData.getSnapshotId()).thenReturn(snapshotId); + when(fromSnapshotLocalData.getVersionSstFileInfos()).thenReturn(Collections.emptyMap()); + + when(snapProvider.getPreviousSnapshotLocalData()).thenReturn(fromSnapshotLocalData); + when(snapProvider.getSnapshotLocalData()).thenReturn(toSnapshotLocalData); + when(localDataManager.getOmSnapshotLocalData(any(UUID.class), any(UUID.class))).thenReturn(snapProvider); + + assertThrows(IOException.class, () -> + rdbDifferComputer.computeDeltaFiles(fromSnapshot, toSnapshot, tablesToLookup, tablePrefixInfo), + "Should throw IOException when no versions found"); + } + + /** + * Tests that close properly cleans up resources. + */ + @Test + public void testClose() throws IOException { + rdbDifferComputer = new RDBDifferComputer(omSnapshotManager, activeMetadataManager, + deltaDirPath, activityReporter); + + assertTrue(Files.exists(deltaDirPath), "Delta directory should exist"); + + rdbDifferComputer.close(); + + assertFalse(Files.exists(deltaDirPath), "Delta directory should be cleaned up after close"); + } + + /** + * Tests computeDeltaFiles with IOException from differ. + */ + @Test + public void testComputeDeltaFilesWithIOException() throws IOException { + rdbDifferComputer = new RDBDifferComputer(omSnapshotManager, activeMetadataManager, + deltaDirPath, activityReporter); + + UUID fromSnapshotId = UUID.randomUUID(); + UUID toSnapshotId = UUID.randomUUID(); + SnapshotInfo fromSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap1", fromSnapshotId); + SnapshotInfo toSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap2", toSnapshotId); + Set tablesToLookup = ImmutableSet.of("keyTable"); + TablePrefixInfo tablePrefixInfo = mock(TablePrefixInfo.class); + + // Mock snapshot local data + ReadableOmSnapshotLocalDataProvider snapProvider = mock(ReadableOmSnapshotLocalDataProvider.class); + OmSnapshotLocalData fromSnapshotLocalData = createMockSnapshotLocalData(fromSnapshotId, 1); + OmSnapshotLocalData toSnapshotLocalData = createMockSnapshotLocalData(toSnapshotId, 2); + + when(snapProvider.getPreviousSnapshotLocalData()).thenReturn(fromSnapshotLocalData); + when(snapProvider.getSnapshotLocalData()).thenReturn(toSnapshotLocalData); + when(localDataManager.getOmSnapshotLocalData(toSnapshotId, fromSnapshotId)).thenReturn(snapProvider); + + when(differ.getSSTDiffListWithFullPath(any(DifferSnapshotInfo.class), any(DifferSnapshotInfo.class), + any(Map.class), any(TablePrefixInfo.class), anySet())) + .thenThrow(new IOException("Test exception")); + + assertThrows(IOException.class, () -> + rdbDifferComputer.computeDeltaFiles(fromSnapshot, toSnapshot, tablesToLookup, tablePrefixInfo), + "Should propagate IOException from differ"); + + verify(snapProvider, times(1)).close(); + } + + /** + * Tests that differ operations are synchronized. + */ + @Test + public void testDifferSynchronization() throws IOException { + rdbDifferComputer = new RDBDifferComputer(omSnapshotManager, activeMetadataManager, + deltaDirPath, activityReporter); + + UUID fromSnapshotId = UUID.randomUUID(); + UUID toSnapshotId = UUID.randomUUID(); + SnapshotInfo fromSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap1", fromSnapshotId); + SnapshotInfo toSnapshot = createMockSnapshotInfo("vol1", "bucket1", "snap2", toSnapshotId); + Set tablesToLookup = ImmutableSet.of("keyTable"); + TablePrefixInfo tablePrefixInfo = mock(TablePrefixInfo.class); + + // Mock snapshot local data + ReadableOmSnapshotLocalDataProvider snapProvider = mock(ReadableOmSnapshotLocalDataProvider.class); + OmSnapshotLocalData fromSnapshotLocalData = createMockSnapshotLocalData(fromSnapshotId, 1); + OmSnapshotLocalData toSnapshotLocalData = createMockSnapshotLocalData(toSnapshotId, 2); + + when(snapProvider.getPreviousSnapshotLocalData()).thenReturn(fromSnapshotLocalData); + when(snapProvider.getSnapshotLocalData()).thenReturn(toSnapshotLocalData); + when(localDataManager.getOmSnapshotLocalData(toSnapshotId, fromSnapshotId)).thenReturn(snapProvider); + + when(differ.getSSTDiffListWithFullPath(any(DifferSnapshotInfo.class), any(DifferSnapshotInfo.class), + any(Map.class), any(TablePrefixInfo.class), anySet())).thenReturn(Optional.empty()); + + // Multiple calls should work correctly (synchronized access to differ) + for (int i = 0; i < 3; i++) { + Optional>> result = + rdbDifferComputer.computeDeltaFiles(fromSnapshot, toSnapshot, tablesToLookup, tablePrefixInfo); + assertFalse(result.isPresent(), "Result should be empty"); + } + + verify(differ, times(3)).getSSTDiffListWithFullPath(any(DifferSnapshotInfo.class), + any(DifferSnapshotInfo.class), any(Map.class), any(TablePrefixInfo.class), anySet()); + } + + // Helper methods + + private SnapshotInfo createMockSnapshotInfo(String volumeName, String bucketName, + String snapshotName, UUID snapshotId) { + SnapshotInfo.Builder builder = SnapshotInfo.newBuilder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setName(snapshotName) + .setSnapshotId(snapshotId) + .setDbTxSequenceNumber(100L); + return builder.build(); + } + + private OmSnapshotLocalData createMockSnapshotLocalData(UUID snapshotId, int version) { + OmSnapshotLocalData localData = mock(OmSnapshotLocalData.class); + when(localData.getSnapshotId()).thenReturn(snapshotId); + + // Create version SST file info + List sstFiles = new ArrayList<>(); + sstFiles.add(new SstFileInfo("file1.sst", "key1", "key2", "keyTable")); + + VersionMeta versionMeta = new VersionMeta(0, sstFiles); + Map versionMap = new TreeMap<>(); + versionMap.put(version, versionMeta); + + when(localData.getVersionSstFileInfos()).thenReturn(versionMap); + when(localData.getVersion()).thenReturn(version); + + return localData; + } + + private OmSnapshotLocalData createMockSnapshotLocalDataWithVersions(UUID snapshotId, int version) { + OmSnapshotLocalData localData = mock(OmSnapshotLocalData.class); + when(localData.getSnapshotId()).thenReturn(snapshotId); + + // Create multiple versions + Map versionMap = new TreeMap<>(); + for (int i = 0; i <= version; i++) { + List sstFiles = new ArrayList<>(); + sstFiles.add(new SstFileInfo("file" + i + ".sst", "key" + i, "key" + (i + 1), "keyTable")); + VersionMeta versionMeta = new VersionMeta(i > 0 ? i - 1 : 0, sstFiles); + versionMap.put(i, versionMeta); + } + + when(localData.getVersionSstFileInfos()).thenReturn(versionMap); + when(localData.getVersion()).thenReturn(version); + + return localData; + } +} + + + + +