diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java index 4195fa099438..2726ecc50645 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java @@ -218,7 +218,7 @@ private void init() throws Exception { // stop the deletion services so that keys can still be read keyManager.stop(); -// preFinalizationChecks(); + preFinalizationChecks(); finalizeOMUpgrade(); counter = new AtomicInteger(); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java index a1dbd2eaa706..c2d662eed50c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java @@ -59,6 +59,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.request.key.OMDirectoriesPurgeRequestWithFSO; +import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest; import org.apache.hadoop.ozone.om.service.SnapshotDiffCleanupService; import org.apache.hadoop.ozone.om.snapshot.SnapshotDiffObject; import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob; @@ -125,7 +127,7 @@ public final class OmSnapshotManager implements AutoCloseable { * | fromSnapshotId-toSnapshotId | SnapshotDiffJob | * |------------------------------------------------| */ - private static final String SNAP_DIFF_JOB_TABLE_NAME = + public static final String SNAP_DIFF_JOB_TABLE_NAME = "snap-diff-job-table"; /** @@ -137,7 +139,7 @@ public final class OmSnapshotManager implements AutoCloseable { * | jobId-index | DiffReportEntry | * |--------------------------------| */ - private static final String SNAP_DIFF_REPORT_TABLE_NAME = + public static final String SNAP_DIFF_REPORT_TABLE_NAME = "snap-diff-report-table"; /** @@ -359,7 +361,7 @@ public OmSnapshot load(@Nonnull String snapshotTableKey) }; } - private CodecRegistry createCodecRegistryForSnapDiff() { + private static CodecRegistry createCodecRegistryForSnapDiff() { final CodecRegistry.Builder registry = CodecRegistry.newBuilder(); // DiffReportEntry codec for Diff Report. registry.addCodec(SnapshotDiffReportOzone.DiffReportEntry.class, diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java index d4a759e2cd04..ac70c44d567b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java @@ -22,47 +22,21 @@ import com.google.common.cache.LoadingCache; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; - import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import org.apache.commons.io.file.PathUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.StringUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; -import org.apache.commons.io.file.PathUtils; -import org.apache.hadoop.hdds.StringUtils; import org.apache.hadoop.hdds.utils.db.CodecRegistry; import org.apache.hadoop.hdds.utils.db.RDBStore; import org.apache.hadoop.hdds.utils.db.Table; -import org.apache.hadoop.ozone.OFSPath; -import org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool; import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB; +import org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry; +import org.apache.hadoop.ozone.OFSPath; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMMetadataManager; @@ -75,14 +49,14 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.helpers.WithObjectID; +import org.apache.hadoop.ozone.om.helpers.WithParentObjectId; import org.apache.hadoop.ozone.om.service.SnapshotDeletingService; import org.apache.hadoop.ozone.om.snapshot.SnapshotDiffObject.SnapshotDiffObjectBuilder; -import org.apache.hadoop.ozone.om.helpers.WithParentObjectId; import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone; -import org.apache.hadoop.util.ClosableIterator; import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse; -import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus; import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobCancelResult; +import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus; +import org.apache.hadoop.util.ClosableIterator; import org.apache.ozone.rocksdb.util.ManagedSstFileReader; import org.apache.ozone.rocksdb.util.RdbUtil; import org.apache.ozone.rocksdiff.DifferSnapshotInfo; @@ -95,8 +69,38 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import static org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.CREATE; +import static org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.DELETE; +import static org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.MODIFY; +import static org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.RENAME; import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_JOB_DEFAULT_WAIT_TIME; @@ -107,6 +111,9 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_THREAD_POOL_SIZE_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF_DEFAULT; +import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DIRECTORY_TABLE; +import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE; +import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE; import static org.apache.hadoop.ozone.om.OmSnapshotManager.DELIMITER; import static org.apache.hadoop.ozone.om.helpers.SnapshotInfo.getTableKey; import static org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.checkSnapshotActive; @@ -119,9 +126,6 @@ import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.QUEUED; import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.REJECTED; -import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry; -import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType; - /** * Class to generate snapshot diff. */ @@ -173,7 +177,7 @@ public class SnapshotDiffManager implements AutoCloseable { private final boolean snapshotForceFullDiff; private final Optional sstDumpTool; - private Optional sstDumptoolExecService; + private Optional sstDumpToolExecService; @SuppressWarnings("parameternumber") public SnapshotDiffManager(ManagedRocksDB db, @@ -274,16 +278,16 @@ private Optional initSSTDumpTool( OMConfigKeys .OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE_DEFAULT, StorageUnit.BYTES); - this.sstDumptoolExecService = Optional.of(new ThreadPoolExecutor(0, + this.sstDumpToolExecService = Optional.of(new ThreadPoolExecutor(0, threadPoolSize, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactoryBuilder() .setNameFormat("snapshot-diff-manager-sst-dump-tool-TID-%d") .build(), new ThreadPoolExecutor.DiscardPolicy())); - return Optional.of(new ManagedSSTDumpTool(sstDumptoolExecService.get(), + return Optional.of(new ManagedSSTDumpTool(sstDumpToolExecService.get(), bufferSize)); } catch (NativeLibraryNotLoadedException e) { - this.sstDumptoolExecService.ifPresent(exec -> + this.sstDumpToolExecService.ifPresent(exec -> closeExecutorService(exec, "SstDumpToolExecutor")); } return Optional.empty(); @@ -346,12 +350,12 @@ private Map getTablePrefixes( String volumeId = String.valueOf(omMetadataManager.getVolumeId(volumeName)); String bucketId = String.valueOf( omMetadataManager.getBucketId(volumeName, bucketName)); - tablePrefixes.put(OmMetadataManagerImpl.KEY_TABLE, - OM_KEY_PREFIX + volumeName + OM_KEY_PREFIX + bucketName - + OM_KEY_PREFIX); - tablePrefixes.put(OmMetadataManagerImpl.FILE_TABLE, + tablePrefixes.put(KEY_TABLE, + OM_KEY_PREFIX + volumeName + OM_KEY_PREFIX + bucketName + + OM_KEY_PREFIX); + tablePrefixes.put(FILE_TABLE, OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId + OM_KEY_PREFIX); - tablePrefixes.put(OmMetadataManagerImpl.DIRECTORY_TABLE, + tablePrefixes.put(DIRECTORY_TABLE, OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId + OM_KEY_PREFIX); return tablePrefixes; } @@ -376,8 +380,10 @@ private DifferSnapshotInfo getDSIFromSI(SnapshotInfo snapshotInfo, getTablePrefixes(snapshotOMMM, volumeName, bucketName)); } - private Set getSSTFileListForSnapshot(OmSnapshot snapshot, - List tablesToLookUp) throws RocksDBException { + @VisibleForTesting + protected Set getSSTFileListForSnapshot(OmSnapshot snapshot, + List tablesToLookUp) + throws RocksDBException { return RdbUtil.getSSTFilesForComparison(snapshot .getMetadataManager().getStore().getDbLocation() .getPath(), tablesToLookUp); @@ -556,20 +562,40 @@ private static OFSPath getSnapshotRootPath(String volume, String bucket) { } @VisibleForTesting - SnapshotDiffReportOzone createPageResponse(final SnapshotDiffJob snapDiffJob, - final String volumeName, final String bucketName, - final String fromSnapshotName, final String toSnapshotName, - final int index, final int pageSize) throws IOException { + SnapshotDiffReportOzone createPageResponse( + final SnapshotDiffJob snapDiffJob, + final String volumeName, + final String bucketName, + final String fromSnapshotName, + final String toSnapshotName, + final int index, + final int pageSize + ) throws IOException { if (index < 0 || pageSize <= 0) { throw new IllegalArgumentException(String.format( "Index should be a number >= 0. Given index %d. Page size " + - "should be a positive number > 0. Given page size is %d", + "should be a positive number > 0. Given page size is %d", index, pageSize)); } - List diffReportList = new ArrayList<>(); OFSPath path = getSnapshotRootPath(volumeName, bucketName); + Pair, String> pageResponse = + createPageResponse(snapDiffJob, index, pageSize); + List diffReportList = pageResponse.getLeft(); + String tokenString = pageResponse.getRight(); + + return new SnapshotDiffReportOzone(path.toString(), volumeName, bucketName, + fromSnapshotName, toSnapshotName, diffReportList, tokenString); + } + + Pair, String> createPageResponse( + final SnapshotDiffJob snapDiffJob, + final int index, + final int pageSize + ) throws IOException { + List diffReportList = new ArrayList<>(); + boolean hasMoreEntries = true; byte[] lowerIndex = codecRegistry.asRawData(getReportKeyForIndex( @@ -597,9 +623,7 @@ SnapshotDiffReportOzone createPageResponse(final SnapshotDiffJob snapDiffJob, String nextTokenString = hasMoreEntries ? String.valueOf(idx) : null; checkReportsIntegrity(snapDiffJob, index, diffReportList.size()); - - return new SnapshotDiffReportOzone(path.toString(), volumeName, bucketName, - fromSnapshotName, toSnapshotName, diffReportList, nextTokenString); + return Pair.of(diffReportList, nextTokenString); } /** @@ -608,9 +632,10 @@ SnapshotDiffReportOzone createPageResponse(final SnapshotDiffJob snapDiffJob, * If check fails, it marks the job failed so that it is GC-ed by clean up * service and throws the exception to client. */ - private void checkReportsIntegrity(final SnapshotDiffJob diffJob, - final int pageStartIdx, - final int numberOfEntriesInPage) + @VisibleForTesting + void checkReportsIntegrity(final SnapshotDiffJob diffJob, + final int pageStartIdx, + final int numberOfEntriesInPage) throws IOException { if ((pageStartIdx >= diffJob.getTotalDiffEntries() && numberOfEntriesInPage != 0) || (pageStartIdx < @@ -757,7 +782,8 @@ private synchronized SnapshotDiffJob getSnapDiffReportStatus( return snapDiffJob; } - private boolean areDiffJobAndSnapshotsActive( + @VisibleForTesting + boolean areDiffJobAndSnapshotsActive( final String volumeName, final String bucketName, final String fromSnapshotName, final String toSnapshotName) throws IOException { @@ -780,13 +806,14 @@ private boolean areDiffJobAndSnapshotsActive( } @SuppressWarnings("methodlength") - private void generateSnapshotDiffReport(final String jobKey, - final String jobId, - final String volumeName, - final String bucketName, - final String fromSnapshotName, - final String toSnapshotName, - final boolean forceFullDiff) { + @VisibleForTesting + void generateSnapshotDiffReport(final String jobKey, + final String jobId, + final String volumeName, + final String bucketName, + final String fromSnapshotName, + final String toSnapshotName, + final boolean forceFullDiff) { LOG.info("Started snap diff report generation for volume: {} " + "bucket: {}, fromSnapshot: {} and toSnapshot: {}", volumeName, bucketName, fromSnapshotName, toSnapshotName); @@ -1043,13 +1070,14 @@ private void getDeltaFilesAndDiffKeysToObjectIdToKeyMap( } } + @VisibleForTesting @SuppressWarnings("checkstyle:ParameterNumber") void addToObjectIdMap(Table fsTable, Table tsTable, Set deltaFiles, boolean nativeRocksToolsLoaded, PersistentMap oldObjIdToKeyMap, PersistentMap newObjIdToKeyMap, - final PersistentMap objectIdToDiffObject, + PersistentMap objectIdToDiffObject, Optional> oldParentIds, Optional> newParentIds, Map tablePrefixes) throws IOException, @@ -1059,7 +1087,7 @@ void addToObjectIdMap(Table fsTable, } String tablePrefix = getTablePrefix(tablePrefixes, fsTable.getName()); boolean isDirectoryTable = - fsTable.getName().equals(OmMetadataManagerImpl.DIRECTORY_TABLE); + fsTable.getName().equals(DIRECTORY_TABLE); ManagedSstFileReader sstFileReader = new ManagedSstFileReader(deltaFiles); validateEstimatedKeyChangesAreInLimits(sstFileReader); @@ -1147,11 +1175,16 @@ private String getKeyOrDirectoryName(boolean isDirectory, return keyInfo.getKeyName(); } + @VisibleForTesting @SuppressWarnings("checkstyle:ParameterNumber") - Set getDeltaFiles(OmSnapshot fromSnapshot, OmSnapshot toSnapshot, - List tablesToLookUp, SnapshotInfo fsInfo, - SnapshotInfo tsInfo, boolean useFullDiff, - Map tablePrefixes, String diffDir) + Set getDeltaFiles(OmSnapshot fromSnapshot, + OmSnapshot toSnapshot, + List tablesToLookUp, + SnapshotInfo fsInfo, + SnapshotInfo tsInfo, + boolean useFullDiff, + Map tablePrefixes, + String diffDir) throws RocksDBException, IOException { // TODO: [SNAPSHOT] Refactor the parameter list final Set deltaFiles = new HashSet<>(); @@ -1316,22 +1349,19 @@ long generateDiffReport( String key = resolveAbsolutePath(isFSOBucket, newParentIdPathMap, newKeyName); DiffReportEntry entry = - SnapshotDiffReportOzone.getDiffReportEntry(DiffType.CREATE, - key); + SnapshotDiffReportOzone.getDiffReportEntry(CREATE, key); createDiffs.add(codecRegistry.asRawData(entry)); } else if (newKeyName == null) { // Key Deleted. String key = resolveAbsolutePath(isFSOBucket, oldParentIdPathMap, oldKeyName); DiffReportEntry entry = - SnapshotDiffReportOzone.getDiffReportEntry(DiffType.DELETE, - key); + SnapshotDiffReportOzone.getDiffReportEntry(DELETE, key); deleteDiffs.add(codecRegistry.asRawData(entry)); } else if (Arrays.equals(oldKeyName, newKeyName)) { // Key modified. String key = resolveAbsolutePath(isFSOBucket, newParentIdPathMap, newKeyName); DiffReportEntry entry = - SnapshotDiffReportOzone.getDiffReportEntry(DiffType.MODIFY, - key); + SnapshotDiffReportOzone.getDiffReportEntry(MODIFY, key); modifyDiffs.add(codecRegistry.asRawData(entry)); } else { // Key Renamed. String oldKey = resolveAbsolutePath(isFSOBucket, oldParentIdPathMap, @@ -1339,8 +1369,8 @@ long generateDiffReport( String newKey = resolveAbsolutePath(isFSOBucket, newParentIdPathMap, newKeyName); renameDiffs.add(codecRegistry.asRawData( - SnapshotDiffReportOzone.getDiffReportEntry(DiffType.RENAME, - oldKey, newKey))); + SnapshotDiffReportOzone.getDiffReportEntry(RENAME, oldKey, + newKey))); // Check if block location is same or not. If it is not same, // key must have been overridden as well. @@ -1349,8 +1379,7 @@ long generateDiffReport( // Here, oldKey name is returned as modified. Modified key name is // based on base snapshot (from snapshot). renameDiffs.add(codecRegistry.asRawData( - SnapshotDiffReportOzone.getDiffReportEntry(DiffType.MODIFY, - oldKey))); + SnapshotDiffReportOzone.getDiffReportEntry(MODIFY, oldKey))); } } @@ -1503,9 +1532,10 @@ private synchronized void updateJobStatusToDone(String jobKey, snapDiffJobTable.put(jobKey, snapshotDiffJob); } - private BucketLayout getBucketLayout(final String volume, - final String bucket, - final OMMetadataManager mManager) + @VisibleForTesting + protected BucketLayout getBucketLayout(final String volume, + final String bucket, + final OMMetadataManager mManager) throws IOException { final String bucketTableKey = mManager.getBucketKey(volume, bucket); return mManager.getBucketTable().get(bucketTableKey).getBucketLayout(); @@ -1541,7 +1571,7 @@ private String getTablePrefix(Map tablePrefixes, * check if the given key is in the bucket specified by tablePrefix map. */ boolean isKeyInBucket(String key, Map tablePrefixes, - String tableName) { + String tableName) { return key.startsWith(getTablePrefix(tablePrefixes, tableName)); } @@ -1556,7 +1586,8 @@ boolean isKeyInBucket(String key, Map tablePrefixes, * When client re-submits previously queued job, workflow will pick it and * execute it. */ - private void loadJobsOnStartUp() { + @VisibleForTesting + void loadJobsOnStartUp() { try (ClosableIterator> iterator = snapDiffJobTable.iterator()) { @@ -1589,7 +1620,7 @@ public void close() { if (snapDiffExecutor != null) { closeExecutorService(snapDiffExecutor, "SnapDiffExecutor"); } - this.sstDumptoolExecService.ifPresent(exec -> + this.sstDumpToolExecService.ifPresent(exec -> closeExecutorService(exec, "SstDumpToolExecutor")); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java index fa7d99e5a22f..46f1773f7b63 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java @@ -24,20 +24,24 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdds.StringUtils; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException; import org.apache.hadoop.hdds.utils.db.CodecRegistry; -import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.hdds.utils.db.RDBStore; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions; +import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB; import org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.OmSnapshot; @@ -48,29 +52,36 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; -import org.apache.hadoop.ozone.om.snapshot.SnapshotDiffObject.SnapshotDiffObjectBuilder; import org.apache.hadoop.ozone.om.helpers.WithParentObjectId; +import org.apache.hadoop.ozone.om.snapshot.SnapshotDiffObject.SnapshotDiffObjectBuilder; +import org.apache.hadoop.ozone.om.snapshot.SnapshotTestUtils.StubbedPersistentMap; import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone; import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse; -import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus; import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobCancelResult; +import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ClosableIterator; +import org.apache.hadoop.util.ExitUtil; import org.apache.ozone.rocksdb.util.ManagedSstFileReader; import org.apache.ozone.rocksdb.util.RdbUtil; import org.apache.ozone.rocksdiff.DifferSnapshotInfo; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; import org.apache.ozone.rocksdiff.RocksDiffUtils; +import org.apache.ratis.util.ExitUtils; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.Matchers; import org.mockito.Mock; import org.mockito.MockedConstruction; import org.mockito.MockedStatic; @@ -79,151 +90,315 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; import org.mockito.stubbing.Answer; +import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.ReadOptions; -import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; import java.io.File; import java.io.IOException; import java.nio.file.Files; -import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.LongStream; import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.utils.db.DBStoreBuilder.DEFAULT_COLUMN_FAMILY_NAME; +import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_JOB_DEFAULT_WAIT_TIME; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_JOB_DEFAULT_WAIT_TIME_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_MAX_ALLOWED_KEYS_CHANGED_PER_DIFF_JOB; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_MAX_ALLOWED_KEYS_CHANGED_PER_DIFF_JOB_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_THREAD_POOL_SIZE; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_THREAD_POOL_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE_DEFAULT; import static org.apache.hadoop.ozone.om.OmSnapshotManager.DELIMITER; +import static org.apache.hadoop.ozone.om.OmSnapshotManager.SNAP_DIFF_JOB_TABLE_NAME; +import static org.apache.hadoop.ozone.om.OmSnapshotManager.SNAP_DIFF_REPORT_TABLE_NAME; +import static org.apache.hadoop.ozone.om.helpers.BucketLayout.LEGACY; +import static org.apache.hadoop.ozone.om.helpers.SnapshotInfo.getTableKey; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone.getDiffReportEntryCodec; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.FAILED; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN_PROGRESS; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.QUEUED; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.REJECTED; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.any; /** - * Test class for SnapshotDiffManager Class. + * Tests for SnapshotDiffManager. */ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) public class TestSnapshotDiffManager { - - @Mock - private ManagedRocksDB snapdiffDB; - + private static final String VOLUME_NAME = "volume"; + private static final String BUCKET_NAME = "bucket"; + private ManagedRocksDB db; + private ManagedDBOptions dbOptions; + private ManagedColumnFamilyOptions columnFamilyOptions; + private List columnFamilyHandles; + private ColumnFamilyHandle snapDiffJobTable; + private ColumnFamilyHandle snapDiffReportTable; + private SnapshotDiffManager snapshotDiffManager; + private final List jobStatuses = Arrays.asList(QUEUED, IN_PROGRESS, + DONE, REJECTED, FAILED); + + private SnapshotInfo snapshotInfo; + private final List snapshotNames = new ArrayList<>(); + private final List snapshotInfoList = new ArrayList<>(); + private final List snapDiffJobs = new ArrayList<>(); + @TempDir + private File dbDir; @Mock private RocksDBCheckpointDiffer differ; - + @Mock + private OMMetadataManager omMetadataManager; @Mock private OzoneManager ozoneManager; - - private LoadingCache snapshotCache; - @Mock - private ColumnFamilyHandle snapdiffJobCFH; - + private OzoneConfiguration configuration; @Mock - private ColumnFamilyHandle snapdiffReportCFH; - + private Table snapshotInfoTable; @Mock - private ManagedColumnFamilyOptions columnFamilyOptions; - + private Table bucketInfoTable; + @Mock + private Table keyInfoTable; + @Mock + private OmBucketInfo omBucketInfo; @Mock - private RocksDB rocksDB; + private RDBStore dbStore; + + private LoadingCache snapshotCache; @Mock private RocksIterator jobTableIterator; private static CodecRegistry codecRegistry; + private final BiFunction + generateSnapDiffJobKey = + (SnapshotInfo fromSnapshotInfo, SnapshotInfo toSnapshotInfo) -> + fromSnapshotInfo.getSnapshotId() + DELIMITER + + toSnapshotInfo.getSnapshotId(); + @BeforeAll public static void initCodecRegistry() { - // Integers are used for indexing persistent list. codecRegistry = CodecRegistry.newBuilder() - .addCodec(SnapshotDiffReportOzone.DiffReportEntry.class, - SnapshotDiffReportOzone.getDiffReportEntryCodec()) - .addCodec(SnapshotDiffJob.class, SnapshotDiffJob.getCodec()).build(); + .addCodec(DiffReportEntry.class, getDiffReportEntryCodec()) + .addCodec(SnapshotDiffJob.class, SnapshotDiffJob.getCodec()) + .build(); } - private DBStore getMockedDBStore(String dbStorePath) { - DBStore dbStore = mock(DBStore.class); - when(dbStore.getDbLocation()).thenReturn(new File(dbStorePath)); - return dbStore; - } + @BeforeEach + public void init() throws RocksDBException, IOException, ExecutionException { + ExitUtils.disableSystemExit(); + ExitUtil.disableSystemExit(); + + dbOptions = new ManagedDBOptions(); + dbOptions.setCreateIfMissing(true); + columnFamilyOptions = new ManagedColumnFamilyOptions(); + + List columnFamilyDescriptors = + Collections.singletonList(new ColumnFamilyDescriptor( + StringUtils.string2Bytes(DEFAULT_COLUMN_FAMILY_NAME), + columnFamilyOptions)); + + columnFamilyHandles = new ArrayList<>(); + + db = ManagedRocksDB.open(dbOptions, dbDir.getAbsolutePath(), + columnFamilyDescriptors, columnFamilyHandles); + + snapDiffJobTable = db.get().createColumnFamily( + new ColumnFamilyDescriptor( + StringUtils.string2Bytes(SNAP_DIFF_JOB_TABLE_NAME), + columnFamilyOptions)); + snapDiffReportTable = db.get().createColumnFamily( + new ColumnFamilyDescriptor( + StringUtils.string2Bytes(SNAP_DIFF_REPORT_TABLE_NAME), + columnFamilyOptions)); + + columnFamilyHandles.add(snapDiffJobTable); + columnFamilyHandles.add(snapDiffReportTable); + + String snapshotNamePrefix = "snap-"; + String snapshotPath = "snapshotPath"; + String snapshotCheckpointDir = "snapshotCheckpointDir"; + UUID baseSnapshotId = UUID.randomUUID(); + String baseSnapshotName = snapshotNamePrefix + baseSnapshotId; + snapshotInfo = new SnapshotInfo.Builder() + .setSnapshotId(baseSnapshotId) + .setVolumeName(VOLUME_NAME) + .setBucketName(BUCKET_NAME) + .setName(baseSnapshotName) + .setSnapshotPath(snapshotPath) + .setCheckpointDir(snapshotCheckpointDir) + .build(); - private OmSnapshot getMockedOmSnapshot(String snapshot) { - OmSnapshot omSnapshot = Mockito.mock(OmSnapshot.class); - OMMetadataManager metadataManager = ozoneManager.getMetadataManager(); - DBStore dbStore = getMockedDBStore(snapshot); - Mockito.when(omSnapshot.getName()).thenReturn(snapshot); - Mockito.when(omSnapshot.getMetadataManager()).thenReturn(metadataManager); - Mockito.when(metadataManager.getStore()).thenReturn(dbStore); - return omSnapshot; - } + for (JobStatus jobStatus : jobStatuses) { + UUID targetSnapshotId = UUID.randomUUID(); + String targetSnapshotName = snapshotNamePrefix + + targetSnapshotId; + SnapshotInfo targetSnapshot = new SnapshotInfo.Builder() + .setSnapshotId(targetSnapshotId) + .setVolumeName(VOLUME_NAME) + .setBucketName(BUCKET_NAME) + .setName(targetSnapshotName) + .setSnapshotPath(snapshotPath) + .setCheckpointDir(snapshotCheckpointDir) + .build(); + + SnapshotDiffJob diffJob = new SnapshotDiffJob(System.currentTimeMillis(), + UUID.randomUUID().toString(), jobStatus, VOLUME_NAME, BUCKET_NAME, + baseSnapshotName, targetSnapshotName, false, 0); + + snapshotNames.add(targetSnapshotName); + snapshotInfoList.add(targetSnapshot); + snapDiffJobs.add(diffJob); + } + + String bucketTableKey = + OM_KEY_PREFIX + VOLUME_NAME + OM_KEY_PREFIX + BUCKET_NAME; + + when(configuration + .getTimeDuration(OZONE_OM_SNAPSHOT_DIFF_JOB_DEFAULT_WAIT_TIME, + OZONE_OM_SNAPSHOT_DIFF_JOB_DEFAULT_WAIT_TIME_DEFAULT, + TimeUnit.MILLISECONDS)) + .thenReturn(OZONE_OM_SNAPSHOT_DIFF_JOB_DEFAULT_WAIT_TIME_DEFAULT); + when(configuration + .getBoolean(OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF, + OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF_DEFAULT)) + .thenReturn(OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF_DEFAULT); + when(configuration + .getLong(OZONE_OM_SNAPSHOT_DIFF_MAX_ALLOWED_KEYS_CHANGED_PER_DIFF_JOB, + OZONE_OM_SNAPSHOT_DIFF_MAX_ALLOWED_KEYS_CHANGED_PER_DIFF_JOB_DEFAULT + )) + .thenReturn( + OZONE_OM_SNAPSHOT_DIFF_MAX_ALLOWED_KEYS_CHANGED_PER_DIFF_JOB_DEFAULT + ); + when(configuration + .getInt(OZONE_OM_SNAPSHOT_DIFF_THREAD_POOL_SIZE, + OZONE_OM_SNAPSHOT_DIFF_THREAD_POOL_SIZE_DEFAULT)) + .thenReturn(OZONE_OM_SNAPSHOT_DIFF_THREAD_POOL_SIZE_DEFAULT); + when(configuration + .getInt(OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE, + OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE_DEFAULT)) + .thenReturn(OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_POOL_SIZE_DEFAULT); + when(configuration + .getStorageSize(OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE, + OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE_DEFAULT, + StorageUnit.BYTES)) + .thenReturn(FileUtils.ONE_KB_BI.doubleValue()); + + for (int i = 0; i < jobStatuses.size(); i++) { + when(snapshotInfoTable.get(getTableKey(VOLUME_NAME, BUCKET_NAME, + snapshotNames.get(i)))).thenReturn(snapshotInfoList.get(i)); + } + + when(snapshotInfoTable.get(getTableKey(VOLUME_NAME, BUCKET_NAME, + baseSnapshotName))).thenReturn(snapshotInfo); + + when(dbStore.getDbLocation()).thenReturn(dbDir); + when(dbStore.getSnapshotMetadataDir()).thenReturn(dbDir.getAbsolutePath()); + when(omBucketInfo.getBucketLayout()).thenReturn(LEGACY); + when(bucketInfoTable.get(bucketTableKey)).thenReturn(omBucketInfo); + when(omMetadataManager.getStore()).thenReturn(dbStore); + when(omMetadataManager.getSnapshotInfoTable()) + .thenReturn(snapshotInfoTable); + when(omMetadataManager.getBucketTable()).thenReturn(bucketInfoTable); + when(omMetadataManager.getBucketKey(VOLUME_NAME, BUCKET_NAME)) + .thenReturn(bucketTableKey); + when(omMetadataManager.getKeyTable(LEGACY)).thenReturn(keyInfoTable); + when(ozoneManager.getConfiguration()).thenReturn(configuration); + when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager); - private SnapshotDiffManager getMockedSnapshotDiffManager(int cacheSize) - throws IOException { - when(snapdiffDB.get()).thenReturn(rocksDB); - when(rocksDB.newIterator(snapdiffJobCFH)) - .thenReturn(jobTableIterator); - when(rocksDB.newIterator(Mockito.eq(snapdiffJobCFH), - Mockito.any(ReadOptions.class))) - .thenReturn(jobTableIterator); CacheLoader loader = new CacheLoader() { + @NotNull @Override - public OmSnapshot load(String key) { + public OmSnapshot load(@NotNull String key) { return getMockedOmSnapshot(key); } }; - snapshotCache = CacheBuilder.newBuilder() - .maximumSize(cacheSize) - .build(loader); - Mockito.when(ozoneManager.getConfiguration()) - .thenReturn(new OzoneConfiguration()); - OMMetadataManager mockedMetadataManager = - Mockito.mock(OMMetadataManager.class); - RDBStore mockedRDBStore = Mockito.mock(RDBStore.class); - Path diffDir = Files.createTempDirectory("snapdiff_dir"); - Mockito.when(mockedRDBStore.getSnapshotMetadataDir()) - .thenReturn(diffDir.toString()); - Mockito.when(mockedMetadataManager.getStore()).thenReturn(mockedRDBStore); - Mockito.when(ozoneManager.getMetadataManager()) - .thenReturn(mockedMetadataManager); - SnapshotDiffManager snapshotDiffManager = Mockito.spy( - new SnapshotDiffManager(snapdiffDB, differ, ozoneManager, snapshotCache, - snapdiffJobCFH, snapdiffReportCFH, columnFamilyOptions, - codecRegistry)); - PersistentMap snapDiffJobTable = - new SnapshotTestUtils.StubbedPersistentMap<>(); - HddsWhiteboxTestUtils.setInternalState(snapshotDiffManager, - "snapDiffJobTable", snapDiffJobTable); - return snapshotDiffManager; + + snapshotCache = CacheBuilder.newBuilder().maximumSize(10).build(loader); + + snapshotDiffManager = new SnapshotDiffManager(db, differ, ozoneManager, + snapshotCache, snapDiffJobTable, snapDiffReportTable, + columnFamilyOptions, codecRegistry); + } + + @AfterEach + public void tearDown() { + if (columnFamilyHandles != null) { + columnFamilyHandles.forEach(IOUtils::closeQuietly); + } + + IOUtils.closeQuietly(db); + IOUtils.closeQuietly(dbOptions); + IOUtils.closeQuietly(columnFamilyOptions); + IOUtils.closeQuietly(snapshotDiffManager); + } + + private OmSnapshot getMockedOmSnapshot(String snapshot) { + OmSnapshot omSnapshot = mock(OmSnapshot.class); + when(omSnapshot.getName()).thenReturn(snapshot); + when(omSnapshot.getMetadataManager()).thenReturn(omMetadataManager); + when(omMetadataManager.getStore()).thenReturn(dbStore); + return omSnapshot; } private SnapshotInfo getMockedSnapshotInfo(UUID snapshotId) { - SnapshotInfo snapshotInfo = mock(SnapshotInfo.class); - Mockito.when(snapshotInfo.getSnapshotId()).thenReturn(snapshotId); - return snapshotInfo; + SnapshotInfo snapInfo = mock(SnapshotInfo.class); + when(snapInfo.getSnapshotId()).thenReturn(snapshotId); + return snapInfo; } @ParameterizedTest @ValueSource(ints = {1, 2, 5, 10, 100, 1000, 10000}) public void testGetDeltaFilesWithDag(int numberOfFiles) throws ExecutionException, RocksDBException, IOException { - - SnapshotDiffManager snapshotDiffManager = getMockedSnapshotDiffManager(10); UUID snap1 = UUID.randomUUID(); UUID snap2 = UUID.randomUUID(); @@ -231,18 +406,23 @@ public void testGetDeltaFilesWithDag(int numberOfFiles) Set randomStrings = IntStream.range(0, numberOfFiles) .mapToObj(i -> RandomStringUtils.randomAlphabetic(10)) .collect(Collectors.toSet()); - Mockito.when(differ.getSSTDiffListWithFullPath(Mockito.any(), - Mockito.any(), Mockito.eq(diffDir))) - .thenReturn(Lists.newArrayList(randomStrings)); + + when(differ.getSSTDiffListWithFullPath( + any(DifferSnapshotInfo.class), + any(DifferSnapshotInfo.class), + eq(diffDir)) + ).thenReturn(Lists.newArrayList(randomStrings)); + SnapshotInfo fromSnapshotInfo = getMockedSnapshotInfo(snap1); - SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap1); - Mockito.when(jobTableIterator.isValid()).thenReturn(false); + SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap2); + when(jobTableIterator.isValid()).thenReturn(false); Set deltaFiles = snapshotDiffManager.getDeltaFiles( snapshotCache.get(snap1.toString()), snapshotCache.get(snap2.toString()), - Arrays.asList("cf1", "cf2"), fromSnapshotInfo, toSnapshotInfo, false, - Collections.EMPTY_MAP, diffDir); - Assertions.assertEquals(randomStrings, deltaFiles); + Arrays.asList("cf1", "cf2"), fromSnapshotInfo, + toSnapshotInfo, false, + Collections.emptyMap(), diffDir); + assertEquals(randomStrings, deltaFiles); } @ParameterizedTest @@ -257,9 +437,9 @@ public void testGetDeltaFilesWithFullDiff(int numberOfFiles, MockedStatic mockedRocksDiffUtils = Mockito.mockStatic(RocksDiffUtils.class)) { Set deltaStrings = new HashSet<>(); + mockedRdbUtil.when( - () -> RdbUtil.getSSTFilesForComparison(Matchers.anyString(), - Matchers.anyList())) + () -> RdbUtil.getSSTFilesForComparison(anyString(), anyList())) .thenAnswer((Answer>) invocation -> { Set retVal = IntStream.range(0, numberOfFiles) .mapToObj(i -> RandomStringUtils.randomAlphabetic(10)) @@ -267,39 +447,44 @@ public void testGetDeltaFilesWithFullDiff(int numberOfFiles, deltaStrings.addAll(retVal); return retVal; }); - mockedRocksDiffUtils.when(() -> RocksDiffUtils.filterRelevantSstFiles( - Matchers.anySet(), Matchers.anyMap())) + + mockedRocksDiffUtils.when(() -> + RocksDiffUtils.filterRelevantSstFiles(anySet(), anyMap())) .thenAnswer((Answer) invocationOnMock -> { invocationOnMock.getArgument(0, Set.class).stream() .findAny().ifPresent(val -> { - Assertions.assertTrue(deltaStrings.contains(val)); + assertTrue(deltaStrings.contains(val)); invocationOnMock.getArgument(0, Set.class).remove(val); deltaStrings.remove(val); }); return null; }); - SnapshotDiffManager snapshotDiffManager = - getMockedSnapshotDiffManager(10); + SnapshotDiffManager spy = spy(snapshotDiffManager); UUID snap1 = UUID.randomUUID(); UUID snap2 = UUID.randomUUID(); if (!useFullDiff) { Set randomStrings = Collections.emptySet(); - Mockito.when(differ.getSSTDiffListWithFullPath( - Mockito.any(DifferSnapshotInfo.class), - Mockito.any(DifferSnapshotInfo.class), - Matchers.anyString())) + when(differ.getSSTDiffListWithFullPath( + any(DifferSnapshotInfo.class), + any(DifferSnapshotInfo.class), + anyString())) .thenReturn(Lists.newArrayList(randomStrings)); } + SnapshotInfo fromSnapshotInfo = getMockedSnapshotInfo(snap1); SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap1); - Mockito.when(jobTableIterator.isValid()).thenReturn(false); - Set deltaFiles = snapshotDiffManager.getDeltaFiles( + when(jobTableIterator.isValid()).thenReturn(false); + Set deltaFiles = spy.getDeltaFiles( snapshotCache.get(snap1.toString()), snapshotCache.get(snap2.toString()), - Arrays.asList("cf1", "cf2"), fromSnapshotInfo, toSnapshotInfo, false, - Collections.EMPTY_MAP, Files.createTempDirectory("snapdiff_dir") - .toAbsolutePath().toString()); - Assertions.assertEquals(deltaStrings, deltaFiles); + Arrays.asList("cf1", "cf2"), + fromSnapshotInfo, + toSnapshotInfo, + false, + Collections.emptyMap(), + Files.createTempDirectory("snapdiff_dir").toAbsolutePath() + .toString()); + assertEquals(deltaStrings, deltaFiles); } } @@ -307,9 +492,9 @@ public void testGetDeltaFilesWithFullDiff(int numberOfFiles, Map map, String tableName) throws IOException { Table mocked = mock(Table.class); - Mockito.when(mocked.get(Matchers.any())) + when(mocked.get(any())) .thenAnswer(invocation -> map.get(invocation.getArgument(0))); - Mockito.when(mocked.getName()).thenReturn(tableName); + when(mocked.getName()).thenReturn(tableName); return mocked; } @@ -331,17 +516,12 @@ private WithParentObjectId getKeyInfo(int objectId, int updateId, /** * Test mocks the SSTFileReader to return object Ids from 0-50 * when not reading tombstones & Object Ids 0-100 when reading tombstones. - * Creating a mock snapshot table where the from Snapshot Table contains - * Object Ids in the range 0-25 & 50-100 and to Snaphshot Table contains data + * Creating a mock snapshot table where the fromSnapshot Table contains + * Object Ids in the range 0-25 & 50-100 and toSnapshot Table contains data * with object Ids in the range 0-50. * Function should return 25-50 in the new Persistent map. * In the case of reading tombstones old Snapshot Persistent map should have * object Ids in the range 50-100 & should be empty otherwise - * - * @param nativeLibraryLoaded - * @param snapshotTableName - * @throws NativeLibraryNotLoadedException - * @throws IOException */ @SuppressFBWarnings({"DLS_DEAD_LOCAL_STORE", "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"}) @@ -355,368 +535,303 @@ private WithParentObjectId getKeyInfo(int objectId, int updateId, public void testObjectIdMapWithTombstoneEntries(boolean nativeLibraryLoaded, String snapshotTableName) throws NativeLibraryNotLoadedException, IOException, RocksDBException { - // Mocking SST file with keys in SST file including tombstones - Set keysWithTombstones = IntStream.range(0, 100) + Set keysIncludingTombstones = IntStream.range(0, 100) .boxed().map(i -> (i + 100) + "/key" + i).collect(Collectors.toSet()); // Mocking SST file with keys in SST file excluding tombstones - Set keys = IntStream.range(0, 50).boxed() + Set keysExcludingTombstones = IntStream.range(0, 50).boxed() .map(i -> (i + 100) + "/key" + i).collect(Collectors.toSet()); + // Mocking SSTFileReader functions to return the above keys list. try (MockedConstruction mockedSSTFileReader = Mockito.mockConstruction(ManagedSstFileReader.class, (mock, context) -> { - Mockito.when(mock.getKeyStreamWithTombstone(Matchers.any())) - .thenReturn(keysWithTombstones.stream()); - Mockito.when(mock.getKeyStream()) - .thenReturn(keys.stream()); + when(mock.getKeyStreamWithTombstone(any())) + .thenReturn(keysIncludingTombstones.stream()); + when(mock.getKeyStream()) + .thenReturn(keysExcludingTombstones.stream()); }); MockedConstruction mockedSSTDumpTool = Mockito.mockConstruction(ManagedSSTDumpTool.class, (mock, context) -> { }) ) { - // Map toSnapshotTableMap = IntStream.concat(IntStream.range(0, 25), IntStream.range(50, 100)) .boxed().collect(Collectors.toMap(i -> (i + 100) + "/key" + i, i -> getKeyInfo(i, i, i + 100, snapshotTableName))); - // Mocking To snapshot table containing list of keys b/w 0-25, 50-100 Table toSnapshotTable = getMockedTable(toSnapshotTableMap, snapshotTableName); - // Mocking To snapshot table containing list of keys b/w 0-50 + Map fromSnapshotTableMap = IntStream.range(0, 50) .boxed().collect(Collectors.toMap(i -> (i + 100) + "/key" + i, i -> getKeyInfo(i, i, i + 100, snapshotTableName))); - // Expected Diff 25-50 are newly created keys & keys b/w are deleted, - // when reding keys with tombstones the keys would be added to - // objectIdsToBeChecked otherwise it wouldn't be added + Table fromSnapshotTable = getMockedTable(fromSnapshotTableMap, snapshotTableName); - SnapshotDiffManager snapshotDiffManager = - getMockedSnapshotDiffManager(10); - // Mocking to filter even keys in bucket. - // Odd keys should be filtered out in the diff. - Mockito.doAnswer((Answer) invocationOnMock -> - Integer.parseInt(invocationOnMock.getArgument(0, String.class) - .substring(7)) % 2 == 0).when(snapshotDiffManager) - .isKeyInBucket(Matchers.anyString(), Matchers.anyMap(), - Matchers.anyString()); + + snapshotDiffManager = new SnapshotDiffManager(db, differ, ozoneManager, + snapshotCache, snapDiffJobTable, snapDiffReportTable, + columnFamilyOptions, codecRegistry); + SnapshotDiffManager spy = spy(snapshotDiffManager); + + doAnswer(invocation -> { + String[] split = invocation.getArgument(0, String.class).split("/"); + String keyName = split[split.length - 1]; + return Integer.parseInt(keyName.substring(3)) % 2 == 0; + } + ).when(spy).isKeyInBucket(anyString(), anyMap(), anyString()); + PersistentMap oldObjectIdKeyMap = - new SnapshotTestUtils.StubbedPersistentMap<>(); + new StubbedPersistentMap<>(); PersistentMap newObjectIdKeyMap = new SnapshotTestUtils.StubbedPersistentMap<>(); PersistentMap objectIdsToCheck = new SnapshotTestUtils.StubbedPersistentMap<>(); + Set oldParentIds = Sets.newHashSet(); Set newParentIds = Sets.newHashSet(); - snapshotDiffManager.addToObjectIdMap(toSnapshotTable, + + spy.addToObjectIdMap(toSnapshotTable, fromSnapshotTable, Sets.newHashSet("dummy.sst"), nativeLibraryLoaded, oldObjectIdKeyMap, newObjectIdKeyMap, - objectIdsToCheck, Optional.ofNullable(oldParentIds), - Optional.ofNullable(newParentIds), + objectIdsToCheck, Optional.of(oldParentIds), + Optional.of(newParentIds), ImmutableMap.of(OmMetadataManagerImpl.DIRECTORY_TABLE, "", OmMetadataManagerImpl.KEY_TABLE, "", OmMetadataManagerImpl.FILE_TABLE, "")); - Iterator> oldObjectIdIter = - oldObjectIdKeyMap.iterator(); - int oldObjectIdCnt = 0; - while (oldObjectIdIter.hasNext()) { - Entry v = oldObjectIdIter.next(); - long objectId = this.codecRegistry.asObject(v.getKey(), Long.class); - Assertions.assertTrue(objectId % 2 == 0); - Assertions.assertTrue(objectId >= 50); - Assertions.assertTrue(objectId < 100); - oldObjectIdCnt += 1; + try (ClosableIterator> oldObjectIdIter = + oldObjectIdKeyMap.iterator()) { + int oldObjectIdCnt = 0; + while (oldObjectIdIter.hasNext()) { + Map.Entry v = oldObjectIdIter.next(); + long objectId = codecRegistry.asObject(v.getKey(), Long.class); + assertEquals(0, objectId % 2); + assertTrue(objectId >= 50); + assertTrue(objectId < 100); + oldObjectIdCnt += 1; + } + assertEquals(nativeLibraryLoaded ? 25 : 0, oldObjectIdCnt); } - Assertions.assertEquals(nativeLibraryLoaded ? 25 : 0, oldObjectIdCnt); - Iterator> newObjectIdIter = - newObjectIdKeyMap.iterator(); - int newObjectIdCnt = 0; - while (newObjectIdIter.hasNext()) { - Entry v = newObjectIdIter.next(); - long objectId = this.codecRegistry.asObject(v.getKey(), Long.class); - Assertions.assertTrue(objectId % 2 == 0); - Assertions.assertTrue(objectId >= 26); - Assertions.assertTrue(objectId < 50); - newObjectIdCnt += 1; + + try (ClosableIterator> newObjectIdIter = + newObjectIdKeyMap.iterator()) { + int newObjectIdCnt = 0; + while (newObjectIdIter.hasNext()) { + Map.Entry v = newObjectIdIter.next(); + long objectId = codecRegistry.asObject(v.getKey(), Long.class); + assertEquals(0, objectId % 2); + assertTrue(objectId >= 26); + assertTrue(objectId < 50); + newObjectIdCnt += 1; + } + assertEquals(12, newObjectIdCnt); } - Assertions.assertEquals(12, newObjectIdCnt); - - ClosableIterator> objectIdsToCheckIter = - objectIdsToCheck.iterator(); - int objectIdCnt = 0; - while (objectIdsToCheckIter.hasNext()) { - Entry entry = objectIdsToCheckIter.next(); - byte[] v = entry.getKey(); - long objectId = this.codecRegistry.asObject(v, Long.class); - Assertions.assertTrue(objectId % 2 == 0); - Assertions.assertTrue(objectId >= 26); - Assertions.assertTrue(objectId < (nativeLibraryLoaded ? 100 : 50)); - objectIdCnt += 1; + + try (ClosableIterator> + objectIdsToCheckIter = objectIdsToCheck.iterator()) { + int objectIdCnt = 0; + while (objectIdsToCheckIter.hasNext()) { + Entry entry = objectIdsToCheckIter.next(); + byte[] v = entry.getKey(); + long objectId = codecRegistry.asObject(v, Long.class); + assertEquals(0, objectId % 2); + assertTrue(objectId >= 26); + assertTrue(objectId < (nativeLibraryLoaded ? 100 : 50)); + objectIdCnt += 1; + } + assertEquals(nativeLibraryLoaded ? 37 : 12, objectIdCnt); } - Assertions.assertEquals(nativeLibraryLoaded ? 37 : 12, objectIdCnt); } } - /** - Testing generateDiffReport function by providing PersistentMap containing - objectId Map of diff keys to be checked with their corresponding key names. - */ @Test public void testGenerateDiffReport() throws IOException { - // Mocking RocksDbPersistentMap constructor to use stubbed - // implementation instead. - try (MockedConstruction mockedRocksDbPersistentMap = - Mockito.mockConstruction(RocksDbPersistentMap.class, - (mock, context) -> { - PersistentMap obj = - new SnapshotTestUtils.StubbedPersistentMap<>(); - when(mock.iterator()).thenReturn(obj.iterator()); - when(mock.iterator(Mockito.any(Optional.class), - Mockito.any(Optional.class))) - .thenAnswer(i -> obj.iterator(i.getArgument(0), - i.getArgument(1))); - when(mock.get(Matchers.any())) - .thenAnswer(i -> obj.get(i.getArgument(0))); - Mockito.doAnswer((Answer) i -> { - obj.put(i.getArgument(0), i.getArgument(1)); - return null; - }).when(mock).put(Matchers.any(), Matchers.any()); - }); - MockedConstruction mockedPersistentList = - Mockito.mockConstruction( - RocksDbPersistentList.class, (mock, context) -> { - PersistentList obj = - new SnapshotTestUtils.ArrayPersistentList<>(); - Mockito.when(mock.add(Matchers.any())) - .thenAnswer(i -> obj.add(i.getArgument(0))); - Mockito.when(mock.get(Matchers.anyInt())) - .thenAnswer(i -> obj.get(i.getArgument(0))); - Mockito.when(mock.addAll(Matchers.any(PersistentList.class))) - .thenAnswer(i -> obj.addAll(i.getArgument(0))); - Mockito.when(mock.iterator()) - .thenAnswer(i -> obj.iterator()); - })) { - PersistentMap oldObjectIdKeyMap = - new SnapshotTestUtils.StubbedPersistentMap<>(); - PersistentMap newObjectIdKeyMap = - new SnapshotTestUtils.StubbedPersistentMap<>(); - PersistentMap objectIdToDiffObject = - new SnapshotTestUtils.StubbedPersistentMap<>(); - Map diffMap = new HashMap<>(); - LongStream.range(0, 100).forEach(objectId -> { - try { - SnapshotDiffObjectBuilder builder = - new SnapshotDiffObjectBuilder(objectId); - String key = "key" + objectId; - byte[] objectIdVal = codecRegistry.asRawData(objectId); - byte[] keyBytes = codecRegistry.asRawData(key); - if (objectId >= 0 && objectId <= 25 || - objectId >= 50 && objectId <= 100) { - oldObjectIdKeyMap.put(objectIdVal, keyBytes); - builder.withOldKeyName(key); - } - if (objectId >= 0 && objectId <= 25 && objectId % 4 == 0 || - objectId > 25 && objectId < 50) { - newObjectIdKeyMap.put(objectIdVal, keyBytes); - builder.withNewKeyName(key); - } - if (objectId >= 0 && objectId <= 25 && objectId % 4 == 1) { - String renamedKey = "renamed-key" + objectId; - byte[] renamedKeyBytes = codecRegistry.asRawData(renamedKey); - newObjectIdKeyMap.put(objectIdVal, renamedKeyBytes); - diffMap.put(objectId, SnapshotDiffReport.DiffType.RENAME); - builder.withOldKeyName(key); - builder.withNewKeyName(renamedKey); - } - objectIdToDiffObject.put(objectIdVal, builder.build()); - if (objectId >= 50 && objectId <= 100 || - objectId >= 0 && objectId <= 25 && objectId % 4 > 1) { - diffMap.put(objectId, SnapshotDiffReport.DiffType.DELETE); - } - if (objectId >= 0 && objectId <= 25 && objectId % 4 == 0) { - diffMap.put(objectId, SnapshotDiffReport.DiffType.MODIFY); - } - if (objectId > 25 && objectId < 50) { - diffMap.put(objectId, SnapshotDiffReport.DiffType.CREATE); - } - } catch (IOException e) { - throw new RuntimeException(e); + PersistentMap oldObjectIdKeyMap = + new StubbedPersistentMap<>(); + PersistentMap newObjectIdKeyMap = + new StubbedPersistentMap<>(); + PersistentMap objectIdToDiffObject = + new SnapshotTestUtils.StubbedPersistentMap<>(); + Map diffMap = new HashMap<>(); + LongStream.range(0, 100).forEach(objectId -> { + try { + SnapshotDiffObjectBuilder builder = + new SnapshotDiffObjectBuilder(objectId); + String key = "key" + objectId; + byte[] objectIdVal = codecRegistry.asRawData(objectId); + byte[] keyBytes = codecRegistry.asRawData(key); + if (objectId >= 0 && objectId <= 25 || + objectId >= 50 && objectId <= 100) { + oldObjectIdKeyMap.put(objectIdVal, keyBytes); + builder.withOldKeyName(key); + } + if (objectId >= 0 && objectId <= 25 && objectId % 4 == 0 || + objectId > 25 && objectId < 50) { + newObjectIdKeyMap.put(objectIdVal, keyBytes); + builder.withNewKeyName(key); } - }); - String volumeName = "vol"; - String bucketName = "buck"; - String fromSnapName = "fs"; - String toSnapName = "ts"; - UUID fromSnapId = UUID.randomUUID(); - UUID toSnapId = UUID.randomUUID(); - - OmKeyInfo fromKeyInfo = mock(OmKeyInfo.class); - OmKeyInfo toKeyInfo = mock(OmKeyInfo.class); - // This is temporary to make sure that - // SnapshotDeletingService#isBlockLocationInfoSame always return true. - when(toKeyInfo.isHsync()).thenReturn(true); - when(fromKeyInfo.isHsync()).thenReturn(true); - - Table fromSnapTable = mock(Table.class); - Table toSnapTable = mock(Table.class); - when(fromSnapTable.get(anyString())).thenReturn(fromKeyInfo); - when(toSnapTable.get(anyString())).thenReturn(toKeyInfo); - - SnapshotDiffManager snapshotDiffManager = - getMockedSnapshotDiffManager(10); - - setupMocksForRunningASnapDiff(volumeName, bucketName); - setUpSnapshots(volumeName, bucketName, - fromSnapName, toSnapName, fromSnapId, toSnapId); - String jobKey = fromSnapId + DELIMITER + toSnapId; - - SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, "jobId", - JobStatus.IN_PROGRESS, volumeName, - bucketName, fromSnapName, toSnapName, - true, diffMap.size()); - - snapshotDiffManager.getSnapDiffJobTable().put(jobKey, snapshotDiffJob); - - snapshotDiffManager.generateDiffReport("jobId", fromSnapTable, - toSnapTable, objectIdToDiffObject, oldObjectIdKeyMap, - newObjectIdKeyMap, volumeName, bucketName, fromSnapName, toSnapName, - false, null, null); - - snapshotDiffJob.setStatus(JobStatus.DONE); - snapshotDiffManager.getSnapDiffJobTable().put(jobKey, snapshotDiffJob); - - SnapshotDiffReportOzone snapshotDiffReportOzone = - snapshotDiffManager.createPageResponse(snapshotDiffJob, volumeName, - bucketName, fromSnapName, toSnapName, - 0, Integer.MAX_VALUE); - Set expectedOrder = new LinkedHashSet<>(); - expectedOrder.add(SnapshotDiffReport.DiffType.DELETE); - expectedOrder.add(SnapshotDiffReport.DiffType.RENAME); - expectedOrder.add(SnapshotDiffReport.DiffType.CREATE); - expectedOrder.add(SnapshotDiffReport.DiffType.MODIFY); - - Set actualOrder = new LinkedHashSet<>(); - for (SnapshotDiffReport.DiffReportEntry entry : - snapshotDiffReportOzone.getDiffList()) { - actualOrder.add(entry.getType()); - - long objectId = Long.parseLong( - DFSUtilClient.bytes2String(entry.getSourcePath()).substring(4)); - Assertions.assertEquals(diffMap.get(objectId), entry.getType()); + if (objectId >= 0 && objectId <= 25 && objectId % 4 == 1) { + String renamedKey = "renamed-key" + objectId; + byte[] renamedKeyBytes = codecRegistry.asRawData(renamedKey); + newObjectIdKeyMap.put(objectIdVal, renamedKeyBytes); + diffMap.put(objectId, SnapshotDiffReport.DiffType.RENAME); + builder.withOldKeyName(key); + builder.withNewKeyName(renamedKey); + } + objectIdToDiffObject.put(objectIdVal, builder.build()); + if (objectId >= 50 && objectId <= 100 || + objectId >= 0 && objectId <= 25 && objectId % 4 > 1) { + diffMap.put(objectId, SnapshotDiffReport.DiffType.DELETE); + } + if (objectId >= 0 && objectId <= 25 && objectId % 4 == 0) { + diffMap.put(objectId, SnapshotDiffReport.DiffType.MODIFY); + } + if (objectId > 25 && objectId < 50) { + diffMap.put(objectId, SnapshotDiffReport.DiffType.CREATE); + } + } catch (IOException e) { + throw new RuntimeException(e); } - Assertions.assertEquals(expectedOrder, actualOrder); + }); + + String volumeName = "vol"; + String bucketName = "buck"; + String fromSnapName = "fs"; + String toSnapName = "ts"; + + OmKeyInfo fromKeyInfo = mock(OmKeyInfo.class); + OmKeyInfo toKeyInfo = mock(OmKeyInfo.class); + // This is temporary to make sure that + // SnapshotDeletingService#isBlockLocationInfoSame always return true. + when(toKeyInfo.isHsync()).thenReturn(true); + when(fromKeyInfo.isHsync()).thenReturn(true); + + Table fromSnapTable = mock(Table.class); + Table toSnapTable = mock(Table.class); + when(fromSnapTable.get(anyString())).thenReturn(fromKeyInfo); + when(toSnapTable.get(anyString())).thenReturn(toKeyInfo); + + + SnapshotDiffManager spy = spy(snapshotDiffManager); + doReturn(true).when(spy) + .areDiffJobAndSnapshotsActive(volumeName, bucketName, fromSnapName, + toSnapName); + + long totalDiffEntries = spy.generateDiffReport("jobId", + fromSnapTable, toSnapTable, objectIdToDiffObject, oldObjectIdKeyMap, + newObjectIdKeyMap, volumeName, bucketName, fromSnapName, toSnapName, + false, Optional.empty(), Optional.empty()); + + assertEquals(100, totalDiffEntries); + SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, "jobId", + JobStatus.DONE, "vol", "buck", "fs", "ts", + true, diffMap.size()); + SnapshotDiffReportOzone snapshotDiffReportOzone = + snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol", + "buck", "fs", "ts", + 0, Integer.MAX_VALUE); + Set expectedOrder = new LinkedHashSet<>(); + expectedOrder.add(SnapshotDiffReport.DiffType.DELETE); + expectedOrder.add(SnapshotDiffReport.DiffType.RENAME); + expectedOrder.add(SnapshotDiffReport.DiffType.CREATE); + expectedOrder.add(SnapshotDiffReport.DiffType.MODIFY); + + Set actualOrder = new LinkedHashSet<>(); + for (DiffReportEntry entry : + snapshotDiffReportOzone.getDiffList()) { + actualOrder.add(entry.getType()); + + long objectId = Long.parseLong( + DFSUtilClient.bytes2String(entry.getSourcePath()).substring(4)); + assertEquals(diffMap.get(objectId), entry.getType()); } + assertEquals(expectedOrder, actualOrder); } - private SnapshotDiffReport.DiffReportEntry getTestDiffEntry(String jobId, - int idx) throws IOException { - return new SnapshotDiffReport.DiffReportEntry( + private DiffReportEntry getTestDiffEntry(String jobId, + int idx) throws IOException { + return new DiffReportEntry( SnapshotDiffReport.DiffType.values()[idx % SnapshotDiffReport.DiffType.values().length], codecRegistry.asRawData(jobId + DELIMITER + idx)); } /** - Testing generateDiffReport function by providing PersistentMap containing - objectId Map of diff keys to be checked with their corresponding key names. + * Testing generateDiffReport function by providing PersistentMap containing + * objectId Map of diff keys to be checked with their corresponding key names. */ @ParameterizedTest @CsvSource({"0,10,1000", "1,10,8", "1000,1000,10", "-1,1000,10000", "1,0,1000", "1,-1,1000"}) - public void testCreatePageResponse(int startIdx, int pageSize, - int totalNumberOfRecords) throws IOException { - // Mocking RocksDbPersistentMap constructor to use stubbed - // implementation instead. - Map - cfHandleRocksDbPersistentMap = new HashMap<>(); - try (MockedConstruction mockedRocksDbPersistentMap = - Mockito.mockConstruction(RocksDbPersistentMap.class, - (mock, context) -> { - ColumnFamilyHandle cf = - (ColumnFamilyHandle) context.arguments().stream() - .filter(arg -> arg instanceof ColumnFamilyHandle) - .findFirst().get(); - cfHandleRocksDbPersistentMap.put(cf, mock); - PersistentMap obj = - new SnapshotTestUtils.StubbedPersistentMap<>(); - when(mock.iterator()).thenReturn(obj.iterator()); - when(mock.iterator(any(Optional.class), - any(Optional.class))).thenAnswer(i -> - obj.iterator(i.getArgument(0), i.getArgument(1))); - Mockito.when(mock.get(Matchers.any())) - .thenAnswer(i -> obj.get(i.getArgument(0))); - Mockito.doAnswer((Answer) i -> { - obj.put(i.getArgument(0), i.getArgument(1)); - return null; - }).when(mock).put(Matchers.any(), Matchers.any()); - })) { - String testJobId = "jobId"; - String testJobId2 = "jobId2"; - SnapshotDiffManager snapshotDiffManager = - getMockedSnapshotDiffManager(10); - IntStream.range(0, totalNumberOfRecords).boxed().forEach(idx -> { - try { - cfHandleRocksDbPersistentMap.get(snapdiffReportCFH) - .put(codecRegistry.asRawData(SnapshotDiffManager - .getReportKeyForIndex(testJobId, idx)), - codecRegistry.asRawData(getTestDiffEntry(testJobId, idx))); - cfHandleRocksDbPersistentMap.get(snapdiffReportCFH) - .put(codecRegistry.asRawData(SnapshotDiffManager - .getReportKeyForIndex(testJobId2, idx)), - codecRegistry.asRawData(getTestDiffEntry(testJobId2, idx))); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, testJobId, - SnapshotDiffResponse.JobStatus.DONE, "vol", "buck", "fs", "ts", - true, totalNumberOfRecords); - SnapshotDiffJob snapshotDiffJob2 = new SnapshotDiffJob(0, testJobId2, - SnapshotDiffResponse.JobStatus.DONE, "vol", "buck", "fs", "ts", - true, totalNumberOfRecords); - cfHandleRocksDbPersistentMap.get(snapdiffJobCFH) - .put(codecRegistry.asRawData(testJobId), snapshotDiffJob); - cfHandleRocksDbPersistentMap.get(snapdiffJobCFH) - .put(codecRegistry.asRawData(testJobId), snapshotDiffJob2); - if (pageSize <= 0 || startIdx < 0) { - Assertions.assertThrows(IllegalArgumentException.class, - () -> snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol", - "buck", "fs", "ts", startIdx, pageSize)); - return; - } - SnapshotDiffReportOzone snapshotDiffReportOzone = - snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol", - "buck", "fs", "ts", - startIdx, pageSize); - int expectedTotalNumberOfRecords = - Math.max(Math.min(pageSize, totalNumberOfRecords - startIdx), 0); - Assertions.assertEquals(snapshotDiffReportOzone.getDiffList().size(), - expectedTotalNumberOfRecords); - - int idx = startIdx; - for (SnapshotDiffReport.DiffReportEntry entry : - snapshotDiffReportOzone.getDiffList()) { - Assertions.assertEquals(getTestDiffEntry(testJobId, idx), entry); - idx++; + public void testCreatePageResponse(int startIdx, + int pageSize, + int totalNumberOfRecords) + throws IOException, RocksDBException { + String testJobId = "jobId"; + String testJobId2 = "jobId2"; + + IntStream.range(0, totalNumberOfRecords).boxed().forEach(idx -> { + try { + db.get().put(snapDiffReportTable, + codecRegistry.asRawData(SnapshotDiffManager + .getReportKeyForIndex(testJobId, idx)), + codecRegistry.asRawData(getTestDiffEntry(testJobId, idx))); + db.get().put(snapDiffReportTable, + codecRegistry.asRawData(testJobId2 + DELIMITER + idx), + codecRegistry.asRawData(getTestDiffEntry(testJobId2, idx))); + } catch (IOException | RocksDBException e) { + throw new RuntimeException(e); } + }); + + SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, testJobId, + JobStatus.DONE, "vol", "buck", "fs", "ts", + true, totalNumberOfRecords); + + SnapshotDiffJob snapshotDiffJob2 = new SnapshotDiffJob(0, testJobId2, + JobStatus.DONE, "vol", "buck", "fs", "ts", + true, totalNumberOfRecords); + + db.get().put(snapDiffJobTable, + codecRegistry.asRawData(testJobId), + codecRegistry.asRawData(snapshotDiffJob)); + + db.get().put(snapDiffJobTable, + codecRegistry.asRawData(testJobId2), + codecRegistry.asRawData(snapshotDiffJob2)); + + if (pageSize <= 0 || startIdx < 0) { + Assertions.assertThrows(IllegalArgumentException.class, + () -> snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol", + "buck", "fs", "ts", startIdx, pageSize)); + return; + } + SnapshotDiffReportOzone snapshotDiffReportOzone = + snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol", + "buck", "fs", "ts", + startIdx, pageSize); + int expectedTotalNumberOfRecords = + Math.max(Math.min(pageSize, totalNumberOfRecords - startIdx), 0); + assertEquals(snapshotDiffReportOzone.getDiffList().size(), + expectedTotalNumberOfRecords); + + int idx = startIdx; + for (DiffReportEntry entry : snapshotDiffReportOzone.getDiffList()) { + assertEquals(getTestDiffEntry(testJobId, idx), entry); + idx++; } } /** * Once a job is cancelled, it stays in the table until * SnapshotDiffCleanupService removes it. - * * Job response until that happens, is CANCELLED. */ @Test - public void testGetSnapshotDiffReportForCancelledJob() - throws IOException { - SnapshotDiffManager snapshotDiffManager = - getMockedSnapshotDiffManager(10); + public void testGetSnapshotDiffReportForCancelledJob() throws IOException { String volumeName = "vol-" + RandomStringUtils.randomNumeric(5); String bucketName = "bucket-" + RandomStringUtils.randomNumeric(5); @@ -732,55 +847,56 @@ public void testGetSnapshotDiffReportForCancelledJob() setUpSnapshots(volumeName, bucketName, fromSnapshotName, toSnapshotName, fromSnapshotUUID, toSnapshotUUID); - PersistentMap snapDiffJobTable = + PersistentMap snapDiffJobMap = snapshotDiffManager.getSnapDiffJobTable(); String diffJobKey = fromSnapshotUUID + DELIMITER + toSnapshotUUID; - SnapshotDiffJob diffJob = snapDiffJobTable.get(diffJobKey); + SnapshotDiffJob diffJob = snapDiffJobMap.get(diffJobKey); Assertions.assertNull(diffJob); + + SnapshotDiffManager spy = spy(snapshotDiffManager); + doNothing().when(spy).generateSnapshotDiffReport(eq(diffJobKey), + anyString(), eq(volumeName), eq(bucketName), eq(fromSnapshotName), + eq(toSnapshotName), eq(false)); + // Submit a new job. - SnapshotDiffResponse snapshotDiffResponse = snapshotDiffManager - .getSnapshotDiffReport(volumeName, bucketName, - fromSnapshotName, toSnapshotName, - 0, 0, false); + SnapshotDiffResponse snapshotDiffResponse = + spy.getSnapshotDiffReport(volumeName, bucketName, fromSnapshotName, + toSnapshotName, 0, 0, false); - Assertions.assertEquals(JobStatus.IN_PROGRESS, + assertEquals(JobStatus.IN_PROGRESS, snapshotDiffResponse.getJobStatus()); // Cancel the job. - snapshotDiffManager.cancelSnapshotDiff(volumeName, bucketName, + spy.cancelSnapshotDiff(volumeName, bucketName, fromSnapshotName, toSnapshotName); // Job status should be cancelled until the cleanup // service removes the job from the table. - snapshotDiffResponse = snapshotDiffManager - .getSnapshotDiffReport(volumeName, bucketName, - fromSnapshotName, toSnapshotName, - 0, 0, false); + snapshotDiffResponse = spy.getSnapshotDiffReport(volumeName, bucketName, + fromSnapshotName, toSnapshotName, 0, 0, false); - Assertions.assertEquals(JobStatus.CANCELLED, + assertEquals(JobStatus.CANCELLED, snapshotDiffResponse.getJobStatus()); // Check snapDiffJobTable. - diffJob = snapDiffJobTable.get(diffJobKey); - Assertions.assertNotNull(diffJob); - Assertions.assertEquals(JobStatus.CANCELLED, + diffJob = snapDiffJobMap.get(diffJobKey); + assertNotNull(diffJob); + assertEquals(JobStatus.CANCELLED, diffJob.getStatus()); // Response should still be cancelled. - snapshotDiffResponse = snapshotDiffManager - .getSnapshotDiffReport(volumeName, bucketName, - fromSnapshotName, toSnapshotName, - 0, 0, false); + snapshotDiffResponse = spy.getSnapshotDiffReport(volumeName, bucketName, + fromSnapshotName, toSnapshotName, 0, 0, false); - Assertions.assertEquals(JobStatus.CANCELLED, + assertEquals(JobStatus.CANCELLED, snapshotDiffResponse.getJobStatus()); // Check snapDiffJobTable. - diffJob = snapDiffJobTable.get(diffJobKey); - Assertions.assertNotNull(diffJob); - Assertions.assertEquals(JobStatus.CANCELLED, + diffJob = snapDiffJobMap.get(diffJobKey); + assertNotNull(diffJob); + assertEquals(JobStatus.CANCELLED, diffJob.getStatus()); } @@ -807,8 +923,6 @@ public void testSnapshotDiffCancelFailure(JobStatus jobStatus, JobCancelResult cancelResult, boolean jobIsCancelled) throws IOException { - SnapshotDiffManager snapshotDiffManager = - getMockedSnapshotDiffManager(10); String volumeName = "vol-" + RandomStringUtils.randomNumeric(5); String bucketName = "bucket-" + RandomStringUtils.randomNumeric(5); @@ -824,7 +938,7 @@ public void testSnapshotDiffCancelFailure(JobStatus jobStatus, setUpSnapshots(volumeName, bucketName, fromSnapshotName, toSnapshotName, fromSnapshotUUID, toSnapshotUUID); - PersistentMap snapDiffJobTable = + PersistentMap snapDiffJobMap = snapshotDiffManager.getSnapDiffJobTable(); String diffJobKey = fromSnapshotUUID + DELIMITER + toSnapshotUUID; @@ -833,27 +947,21 @@ public void testSnapshotDiffCancelFailure(JobStatus jobStatus, jobId, jobStatus, volumeName, bucketName, fromSnapshotName, toSnapshotName, true, 10); - snapDiffJobTable.put(diffJobKey, snapshotDiffJob); + snapDiffJobMap.put(diffJobKey, snapshotDiffJob); SnapshotDiffResponse snapshotDiffResponse = snapshotDiffManager .cancelSnapshotDiff(volumeName, bucketName, fromSnapshotName, toSnapshotName); - Assertions.assertEquals(cancelResult, - snapshotDiffResponse.getJobCancelResult()); + assertEquals(cancelResult, snapshotDiffResponse.getJobCancelResult()); if (jobIsCancelled) { - Assertions.assertEquals(JobStatus.CANCELLED, - snapshotDiffResponse.getJobStatus()); + assertEquals(JobStatus.CANCELLED, snapshotDiffResponse.getJobStatus()); } } @Test - public void testCancelNewSnapshotDiff() - throws IOException { - SnapshotDiffManager snapshotDiffManager = - getMockedSnapshotDiffManager(10); - + public void testCancelNewSnapshotDiff() throws IOException { String volumeName = "vol-" + RandomStringUtils.randomNumeric(5); String bucketName = "bucket-" + RandomStringUtils.randomNumeric(5); @@ -874,7 +982,7 @@ public void testCancelNewSnapshotDiff() // The job doesn't exist on the SnapDiffJob table and // trying to cancel it should lead to NEW_JOB cancel result. - Assertions.assertEquals(JobCancelResult.NEW_JOB, + assertEquals(JobCancelResult.NEW_JOB, snapshotDiffResponse.getJobCancelResult()); } @@ -897,48 +1005,46 @@ public void testListSnapshotDiffJobs(String jobStatus, boolean listAll, boolean containsJob) throws IOException { - SnapshotDiffManager snapshotDiffManager = - getMockedSnapshotDiffManager(10); - String volumeName = "vol-" + RandomStringUtils.randomNumeric(5); String bucketName = "bucket-" + RandomStringUtils.randomNumeric(5); - String fromSnapshotName = "snap-" + RandomStringUtils.randomNumeric(5); String toSnapshotName = "snap-" + RandomStringUtils.randomNumeric(5); UUID fromSnapshotUUID = UUID.randomUUID(); UUID toSnapshotUUID = UUID.randomUUID(); - setupMocksForRunningASnapDiff(volumeName, bucketName); - setUpSnapshots(volumeName, bucketName, fromSnapshotName, toSnapshotName, fromSnapshotUUID, toSnapshotUUID); - PersistentMap snapDiffJobTable = + PersistentMap snapDiffJobMap = snapshotDiffManager.getSnapDiffJobTable(); String diffJobKey = fromSnapshotUUID + DELIMITER + toSnapshotUUID; - SnapshotDiffJob diffJob = snapDiffJobTable.get(diffJobKey); - Assertions.assertNull(diffJob); + SnapshotDiffJob diffJob = snapDiffJobMap.get(diffJobKey); + assertNull(diffJob); // There are no jobs in the table, therefore // the response list should be empty. List jobList = snapshotDiffManager .getSnapshotDiffJobList(volumeName, bucketName, jobStatus, listAll); - Assertions.assertTrue(jobList.isEmpty()); + assertTrue(jobList.isEmpty()); + + SnapshotDiffManager spy = spy(snapshotDiffManager); + doNothing().when(spy).generateSnapshotDiffReport(eq(diffJobKey), + anyString(), eq(volumeName), eq(bucketName), eq(fromSnapshotName), + eq(toSnapshotName), eq(false)); // SnapshotDiffReport - SnapshotDiffResponse snapshotDiffResponse = snapshotDiffManager - .getSnapshotDiffReport(volumeName, bucketName, - fromSnapshotName, toSnapshotName, - 0, 0, false); + SnapshotDiffResponse snapshotDiffResponse = + spy.getSnapshotDiffReport(volumeName, bucketName, fromSnapshotName, + toSnapshotName, 0, 0, false); - Assertions.assertEquals(SnapshotDiffResponse.JobStatus.IN_PROGRESS, + assertEquals(SnapshotDiffResponse.JobStatus.IN_PROGRESS, snapshotDiffResponse.getJobStatus()); - diffJob = snapDiffJobTable.get(diffJobKey); - Assertions.assertNotNull(diffJob); - Assertions.assertEquals(SnapshotDiffResponse.JobStatus.IN_PROGRESS, + diffJob = snapDiffJobMap.get(diffJobKey); + assertNotNull(diffJob); + assertEquals(SnapshotDiffResponse.JobStatus.IN_PROGRESS, diffJob.getStatus()); jobList = snapshotDiffManager @@ -949,117 +1055,314 @@ public void testListSnapshotDiffJobs(String jobStatus, // there should be a response. // Otherwise, response list should be empty. if (containsJob) { - Assertions.assertTrue(jobList.contains(diffJob)); + assertTrue(jobList.contains(diffJob)); } else { - Assertions.assertTrue(jobList.isEmpty()); + assertTrue(jobList.isEmpty()); } } @Test public void testListSnapDiffWithInvalidStatus() throws IOException { - SnapshotDiffManager snapshotDiffManager = - getMockedSnapshotDiffManager(10); - String volumeName = "vol-" + RandomStringUtils.randomNumeric(5); String bucketName = "bucket-" + RandomStringUtils.randomNumeric(5); - String fromSnapshotName = "snap-" + RandomStringUtils.randomNumeric(5); String toSnapshotName = "snap-" + RandomStringUtils.randomNumeric(5); UUID fromSnapshotUUID = UUID.randomUUID(); UUID toSnapshotUUID = UUID.randomUUID(); - setupMocksForRunningASnapDiff(volumeName, bucketName); - setUpSnapshots(volumeName, bucketName, fromSnapshotName, toSnapshotName, fromSnapshotUUID, toSnapshotUUID); - // SnapshotDiffReport - snapshotDiffManager.getSnapshotDiffReport(volumeName, bucketName, - fromSnapshotName, toSnapshotName, - 0, 0, false); + String diffJobKey = fromSnapshotUUID + DELIMITER + toSnapshotUUID; + SnapshotDiffManager spy = spy(snapshotDiffManager); + + doNothing().when(spy).generateSnapshotDiffReport(eq(diffJobKey), + anyString(), eq(volumeName), eq(bucketName), eq(fromSnapshotName), + eq(toSnapshotName), eq(false)); + + spy.getSnapshotDiffReport(volumeName, bucketName, fromSnapshotName, + toSnapshotName, 0, 0, false); // Invalid status, without listAll true, results in an exception. - Assertions.assertThrows(IOException.class, () -> snapshotDiffManager + assertThrows(IOException.class, () -> snapshotDiffManager .getSnapshotDiffJobList(volumeName, bucketName, "invalid", false)); } + @Test + public void testGenerateDiffReportWhenThereInEntry() { + PersistentMap objectIdToDiffObject = + new StubbedPersistentMap<>(); + PersistentMap oldObjIdToKeyMap = + new StubbedPersistentMap<>(); + PersistentMap newObjIdToKeyMap = + new StubbedPersistentMap<>(); + + long totalDiffEntries = snapshotDiffManager.generateDiffReport("jobId", + keyInfoTable, + keyInfoTable, + objectIdToDiffObject, + oldObjIdToKeyMap, + newObjIdToKeyMap, + "volume", + "bucket", + "fromSnapshot", + "toSnapshot", + false, + Optional.empty(), + Optional.empty()); + + assertEquals(0, totalDiffEntries); + } + + @Test + public void testGenerateDiffReportFailure() throws IOException { + String volumeName = "vol-" + RandomStringUtils.randomNumeric(5); + String bucketName = "bucket-" + RandomStringUtils.randomNumeric(5); + String fromSnapshotName = "snap-" + RandomStringUtils.randomNumeric(5); + String toSnapshotName = "snap-" + RandomStringUtils.randomNumeric(5); + + PersistentMap objectIdToDiffObject = + new SnapshotTestUtils.StubbedPersistentMap<>(); + PersistentMap oldObjIdToKeyMap = + new StubbedPersistentMap<>(); + PersistentMap newObjIdToKeyMap = + new StubbedPersistentMap<>(); + objectIdToDiffObject.put(codecRegistry.asRawData("randomKey"), + new SnapshotDiffObjectBuilder(1L).build()); + + SnapshotDiffManager spy = spy(snapshotDiffManager); + doReturn(true).when(spy) + .areDiffJobAndSnapshotsActive(volumeName, bucketName, + fromSnapshotName, toSnapshotName); + + IllegalStateException exception = assertThrows(IllegalStateException.class, + () -> spy.generateDiffReport("jobId", + keyInfoTable, + keyInfoTable, + objectIdToDiffObject, + oldObjIdToKeyMap, + newObjIdToKeyMap, + volumeName, + bucketName, + fromSnapshotName, + toSnapshotName, + false, + Optional.empty(), + Optional.empty()) + ); + assertEquals("Old and new key name both are null", + exception.getMessage()); + } + + /** + * Tests that IN_PROGRESS jobs are submitted to the executor on the service + * startup. + */ + @Test + public void testLoadJobsOnStartUp() throws Exception { + for (int i = 0; i < snapshotInfoList.size(); i++) { + uploadSnapshotDiffJobToDb(snapshotInfo, snapshotInfoList.get(i), + snapDiffJobs.get(i)); + } + + SnapshotDiffManager spy = spy(snapshotDiffManager); + + doAnswer(invocation -> { + SnapshotDiffJob diffJob = getSnapshotDiffJobFromDb(snapshotInfo, + snapshotInfoList.get(1)); + diffJob.setTotalDiffEntries(1L); + diffJob.setStatus(DONE); + uploadSnapshotDiffJobToDb(snapshotInfo, + snapshotInfoList.get(1), + diffJob); + return null; + } + ).when(spy).generateSnapshotDiffReport(anyString(), anyString(), + eq(VOLUME_NAME), eq(BUCKET_NAME), eq(snapshotInfo.getName()), + eq(snapshotInfoList.get(1).getName()), eq(false)); + + spy.loadJobsOnStartUp(); + + // Wait for sometime to make sure that job finishes. + Thread.sleep(1000L); + + SnapshotDiffJob snapDiffJob = getSnapshotDiffJobFromDb(snapshotInfo, + snapshotInfoList.get(1)); + + assertEquals(DONE, snapDiffJob.getStatus()); + assertEquals(1L, snapDiffJob.getTotalDiffEntries()); + } + + private SnapshotDiffJob getSnapshotDiffJobFromDb(SnapshotInfo fromSnapshot, + SnapshotInfo toSnapshot) + throws IOException, RocksDBException { + String jobKey = generateSnapDiffJobKey.apply(fromSnapshot, toSnapshot); + + byte[] bytes = db.get() + .get(snapDiffJobTable, codecRegistry.asRawData(jobKey)); + return codecRegistry.asObject(bytes, SnapshotDiffJob.class); + } + + private void uploadSnapshotDiffJobToDb(SnapshotInfo fromSnapshot, + SnapshotInfo toSnapshot, + SnapshotDiffJob diffJob) + throws IOException, RocksDBException { + String jobKey = generateSnapDiffJobKey.apply(fromSnapshot, toSnapshot); + + byte[] keyBytes = codecRegistry.asRawData(jobKey); + byte[] jobBytes = codecRegistry.asRawData(diffJob); + db.get().put(snapDiffJobTable, keyBytes, jobBytes); + } + + private static Stream threadPoolFullScenarios() { + return Stream.of( + Arguments.of("When there is a wait time between job batches", + 500L, 45, 0), + Arguments.of("When there is no wait time between job batches", + 0L, 20, 25) + ); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("threadPoolFullScenarios") + public void testThreadPoolIsFull(String description, + long waitBetweenBatches, + int expectInProgressJobsCount, + int expectRejectedJobsCount) + throws Exception { + ExecutorService executorService = new ThreadPoolExecutor(100, 100, 0, + TimeUnit.MILLISECONDS, new SynchronousQueue<>() + ); + + List snapshotInfos = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + UUID snapshotId = UUID.randomUUID(); + String snapshotName = "snap-" + snapshotId; + SnapshotInfo snapInfo = new SnapshotInfo.Builder() + .setSnapshotId(snapshotId) + .setVolumeName(VOLUME_NAME) + .setBucketName(BUCKET_NAME) + .setName(snapshotName) + .setSnapshotPath("fromSnapshotPath") + .setCheckpointDir("fromSnapshotCheckpointDir") + .build(); + snapshotInfos.add(snapInfo); + + when(snapshotInfoTable.get(getTableKey(VOLUME_NAME, BUCKET_NAME, + snapshotName))).thenReturn(snapInfo); + } + + SnapshotDiffManager spy = spy(snapshotDiffManager); + + for (int i = 0; i < snapshotInfos.size(); i++) { + for (int j = i + 1; j < snapshotInfos.size(); j++) { + String fromSnapshotName = snapshotInfos.get(i).getName(); + String toSnapshotName = snapshotInfos.get(j).getName(); + + doAnswer(invocation -> { + Thread.sleep(250L); + return null; + }).when(spy).generateSnapshotDiffReport(anyString(), anyString(), + eq(VOLUME_NAME), eq(BUCKET_NAME), eq(fromSnapshotName), + eq(toSnapshotName), eq(false)); + } + } + + List> futures = new ArrayList<>(); + for (int i = 0; i < snapshotInfos.size(); i++) { + for (int j = i + 1; j < snapshotInfos.size(); j++) { + String fromSnapshotName = snapshotInfos.get(i).getName(); + String toSnapshotName = snapshotInfos.get(j).getName(); + + Future future = executorService.submit( + () -> submitJob(spy, fromSnapshotName, toSnapshotName)); + futures.add(future); + } + Thread.sleep(waitBetweenBatches); + } + + // Wait to make sure that all jobs finish before assertion. + Thread.sleep(1000L); + int inProgressJobsCount = 0; + int rejectedJobsCount = 0; + + for (Future future : futures) { + SnapshotDiffResponse response = future.get(); + if (response.getJobStatus() == IN_PROGRESS) { + inProgressJobsCount++; + } else if (response.getJobStatus() == REJECTED) { + rejectedJobsCount++; + } else { + throw new IllegalStateException("Unexpected job status."); + } + } + + assertEquals(expectInProgressJobsCount, inProgressJobsCount); + assertEquals(expectRejectedJobsCount, rejectedJobsCount); + + int notFoundJobs = 0; + for (int i = 0; i < snapshotInfos.size(); i++) { + for (int j = i + 1; j < snapshotInfos.size(); j++) { + SnapshotDiffJob diffJob = + getSnapshotDiffJobFromDb(snapshotInfos.get(i), + snapshotInfos.get(j)); + if (diffJob == null) { + notFoundJobs++; + } + } + } + + // assert that rejected jobs were removed from the job table as well. + assertEquals(expectRejectedJobsCount, notFoundJobs); + executorService.shutdown(); + } + + private SnapshotDiffResponse submitJob(SnapshotDiffManager diffManager, + String fromSnapshotName, + String toSnapshotName) { + try { + return diffManager.getSnapshotDiffReport(VOLUME_NAME, BUCKET_NAME, + fromSnapshotName, toSnapshotName, 0, 1000, false); + } catch (IOException exception) { + throw new RuntimeException(exception); + } + } + private void setUpSnapshots(String volumeName, String bucketName, String fromSnapshotName, String toSnapshotName, UUID fromSnapshotUUID, UUID toSnapshotUUID) throws IOException { - try (MockedStatic mockedSnapUtils = - Mockito.mockStatic(SnapshotUtils.class)) { - // Create 1st snapshot. - SnapshotInfo fromSnapshotInfo = - getSnapshotInfoInstance(volumeName, bucketName, - fromSnapshotName, fromSnapshotUUID); - mockedSnapUtils.when(() -> SnapshotUtils - .getSnapshotInfo(ozoneManager, volumeName, - bucketName, fromSnapshotName)) - .thenReturn(fromSnapshotInfo); - - String fromSnapKey = SnapshotInfo - .getTableKey(fromSnapshotInfo.getVolumeName(), - fromSnapshotInfo.getBucketName(), fromSnapshotInfo.getName()); - - Mockito.when(ozoneManager.getMetadataManager() - .getSnapshotInfoTable().get(fromSnapKey)) - .thenReturn(fromSnapshotInfo); - - mockedSnapUtils.when(() -> SnapshotUtils - .getSnapshotInfo(ozoneManager, fromSnapKey)) - .thenReturn(fromSnapshotInfo); - - OmSnapshot omSnapshotFrom = getMockedOmSnapshot(fromSnapKey); - snapshotCache.put(fromSnapKey, omSnapshotFrom); - - // Create 2nd snapshot. - SnapshotInfo toSnapshotInfo = - getSnapshotInfoInstance(volumeName, bucketName, - toSnapshotName, toSnapshotUUID); - - mockedSnapUtils.when( - () -> SnapshotUtils.getSnapshotInfo(ozoneManager, - volumeName, bucketName, toSnapshotName)) - .thenReturn(toSnapshotInfo); - - String toSnapKey = SnapshotInfo - .getTableKey(toSnapshotInfo.getVolumeName(), - toSnapshotInfo.getBucketName(), toSnapshotInfo.getName()); - - Mockito.when(ozoneManager.getMetadataManager() - .getSnapshotInfoTable().get(toSnapKey)).thenReturn(toSnapshotInfo); - - mockedSnapUtils.when(() -> SnapshotUtils - .getSnapshotInfo(ozoneManager, toSnapKey)) - .thenReturn(toSnapshotInfo); - - OmSnapshot omSnapshotTo = getMockedOmSnapshot(toSnapKey); - snapshotCache.put(toSnapKey, omSnapshotTo); - } + + + SnapshotInfo fromSnapshotInfo = + getSnapshotInfoInstance(volumeName, bucketName, + fromSnapshotName, fromSnapshotUUID); + SnapshotInfo toSnapshotInfo = + getSnapshotInfoInstance(volumeName, bucketName, + toSnapshotName, toSnapshotUUID); + + String fromSnapKey = getTableKey(volumeName, bucketName, fromSnapshotName); + String toSnapKey = getTableKey(volumeName, bucketName, toSnapshotName); + + when(snapshotInfoTable.get(fromSnapKey)).thenReturn(fromSnapshotInfo); + when(snapshotInfoTable.get(toSnapKey)).thenReturn(toSnapshotInfo); } - private SnapshotInfo getSnapshotInfoInstance( - String volumeName, String bucketName, - String snapshotName, UUID snapshotUUID) { - SnapshotInfo snapshotInfo = SnapshotInfo - .newInstance(volumeName, bucketName, - snapshotName, snapshotUUID, - System.currentTimeMillis()); - snapshotInfo.setSnapshotStatus(SnapshotInfo - .SnapshotStatus.SNAPSHOT_ACTIVE); - return snapshotInfo; + private SnapshotInfo getSnapshotInfoInstance(String volumeName, + String bucketName, + String snapshotName, + UUID snapshotUUID) { + SnapshotInfo info = SnapshotInfo.newInstance(volumeName, bucketName, + snapshotName, snapshotUUID, System.currentTimeMillis()); + info.setSnapshotStatus(SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE); + return info; } private void setupMocksForRunningASnapDiff( String volumeName, String bucketName) throws IOException { - Mockito.when(ozoneManager.getMetadataManager().getSnapshotInfoTable()) - .thenReturn(Mockito.mock(Table.class)); - Mockito.when(ozoneManager.getMetadataManager().getBucketTable()) - .thenReturn(Mockito.mock(Table.class)); - Map keyTableMap = new HashMap<>(); keyTableMap.put(BucketLayout.FILE_SYSTEM_OPTIMIZED, OmMetadataManagerImpl.FILE_TABLE); @@ -1069,11 +1372,9 @@ private void setupMocksForRunningASnapDiff( OmMetadataManagerImpl.KEY_TABLE); for (Map.Entry entry : keyTableMap.entrySet()) { - Mockito.when(ozoneManager.getMetadataManager() - .getKeyTable(entry.getKey())) - .thenReturn(Mockito.mock(Table.class)); - Mockito.when(ozoneManager.getMetadataManager() - .getKeyTable(entry.getKey()).getName()) + when(omMetadataManager.getKeyTable(entry.getKey())) + .thenReturn(keyInfoTable); + when(omMetadataManager.getKeyTable(entry.getKey()).getName()) .thenReturn(entry.getValue()); } @@ -1085,9 +1386,89 @@ private void setupMocksForRunningASnapDiff( .setOwner(ugi.getShortUserName()) .build(); - String bucketKey = ozoneManager.getMetadataManager() - .getBucketKey(volumeName, bucketName); - Mockito.when(ozoneManager.getMetadataManager().getBucketTable() - .get(bucketKey)).thenReturn(bucketInfo); + String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName); + when(bucketInfoTable.get(bucketKey)).thenReturn(bucketInfo); + } + + @Test + public void testGetSnapshotDiffReportHappyCase() throws Exception { + SnapshotInfo fromSnapInfo = snapshotInfo; + SnapshotInfo toSnapInfo = snapshotInfoList.get(0); + + Set testDeltaFiles = new HashSet<>(); + + SnapshotDiffManager spy = spy(snapshotDiffManager); + + doReturn(testDeltaFiles).when(spy).getDeltaFiles(any(OmSnapshot.class), + any(OmSnapshot.class), anyList(), eq(fromSnapInfo), eq(toSnapInfo), + eq(false), anyMap(), anyString()); + + doReturn(testDeltaFiles).when(spy) + .getSSTFileListForSnapshot(any(OmSnapshot.class), anyList()); + + doNothing().when(spy).addToObjectIdMap(eq(keyInfoTable), eq(keyInfoTable), + any(), anyBoolean(), any(), any(), any(), any(), any(), anyMap()); + doNothing().when(spy).checkReportsIntegrity(any(), anyInt(), anyInt()); + + doReturn(10L).when(spy).generateDiffReport(anyString(), + any(), any(), any(), any(), any(), anyString(), anyString(), + anyString(), anyString(), anyBoolean(), any(), any()); + doReturn(LEGACY).when(spy).getBucketLayout(VOLUME_NAME, BUCKET_NAME, + omMetadataManager); + + spy.getSnapshotDiffReport(VOLUME_NAME, BUCKET_NAME, fromSnapInfo.getName(), + toSnapInfo.getName(), 0, 1000, false); + + Thread.sleep(1000L); + spy.getSnapshotDiffReport(VOLUME_NAME, BUCKET_NAME, fromSnapInfo.getName(), + toSnapInfo.getName(), 0, 1000, false); + + SnapshotDiffJob snapDiffJob = getSnapshotDiffJobFromDb(fromSnapInfo, + toSnapInfo); + assertEquals(DONE, snapDiffJob.getStatus()); + assertEquals(10L, snapDiffJob.getTotalDiffEntries()); + } + + /** + * Tests that only QUEUED jobs are submitted to the executor and rest are + * short-circuited based on previous one. + */ + @Disabled + @Test + public void testGetSnapshotDiffReportJob() throws Exception { + for (int i = 0; i < jobStatuses.size(); i++) { + uploadSnapshotDiffJobToDb(snapshotInfo, snapshotInfoList.get(i), + snapDiffJobs.get(i)); + } + + SnapshotDiffManager spy = spy(snapshotDiffManager); + + doAnswer(invocation -> { + SnapshotDiffJob diffJob = getSnapshotDiffJobFromDb(snapshotInfo, + snapshotInfoList.get(0)); + diffJob.setTotalDiffEntries(1L); + diffJob.setStatus(DONE); + uploadSnapshotDiffJobToDb(snapshotInfo, + snapshotInfoList.get(0), + diffJob); + return null; + } + ).when(spy).generateSnapshotDiffReport(anyString(), anyString(), + eq(VOLUME_NAME), eq(BUCKET_NAME), eq(snapshotInfo.getName()), + eq(snapshotInfoList.get(0).getName()), eq(false)); + + for (int i = 0; i < snapshotInfoList.size(); i++) { + SnapshotDiffResponse snapshotDiffReport = + spy.getSnapshotDiffReport(VOLUME_NAME, BUCKET_NAME, + snapshotInfo.getName(), snapshotInfoList.get(i).getName(), 0, + 1000, + false); + SnapshotDiffJob diffJob = snapDiffJobs.get(i); + if (diffJob.getStatus() == QUEUED) { + assertEquals(IN_PROGRESS, snapshotDiffReport.getJobStatus()); + } else { + assertEquals(diffJob.getStatus(), snapshotDiffReport.getJobStatus()); + } + } } }