diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java index de100d32ca47..30db0815aa9f 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.utils.db.Codec; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotDiffJobProto; import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus; +import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus; /** * POJO for Snapshot diff job. @@ -54,6 +55,13 @@ public static Codec getCodec() { // is FAILED. private String reason; + // Represents current sub-status of the job if the job is in IN_PROGRESS state. + private SubStatus subStatus; + // percentage of keys processed in the Object-ID generation phase. + // This is the most time-consuming phase as it loads both snapshots + // and reads from it populating the ObjectID-key map. + private double keysProcessedPct; + // Default constructor for Jackson Serializer. public SnapshotDiffJob() { @@ -69,7 +77,9 @@ public SnapshotDiffJob(long creationTime, String toSnapshot, boolean forceFullDiff, boolean disableNativeDiff, - long totalDiffEntries) { + long totalDiffEntries, + SubStatus subStatus, + double keysProcessedPct) { this.creationTime = creationTime; this.jobId = jobId; this.status = jobStatus; @@ -81,6 +91,8 @@ public SnapshotDiffJob(long creationTime, this.disableNativeDiff = disableNativeDiff; this.totalDiffEntries = totalDiffEntries; this.reason = StringUtils.EMPTY; + this.subStatus = subStatus; + this.keysProcessedPct = keysProcessedPct; } public String getJobId() { @@ -171,6 +183,22 @@ public void disableNativeDiff(boolean disableNativeDiffVal) { this.disableNativeDiff = disableNativeDiffVal; } + public SubStatus getSubStatus() { + return subStatus; + } + + public void setSubStatus(SubStatus subStatus) { + this.subStatus = subStatus; + } + + public double getKeysProcessedPct() { + return keysProcessedPct; + } + + public void setKeysProcessedPct(double keysProcessedPct) { + this.keysProcessedPct = keysProcessedPct; + } + @Override public String toString() { StringBuilder sb = new StringBuilder("creationTime : ").append(creationTime) @@ -187,6 +215,13 @@ public String toString() { if (StringUtils.isNotEmpty(reason)) { sb.append(", reason: ").append(reason); } + if (status.equals(JobStatus.IN_PROGRESS) && subStatus != null) { + sb.append(", subStatus: ").append(status); + if (subStatus.equals(SubStatus.OBJECT_ID_MAP_GEN_FSO) || + subStatus.equals(SubStatus.OBJECT_ID_MAP_GEN_OBS)) { + sb.append(String.format(", keysProcessedPercent: %.2f", keysProcessedPct)); + } + } return sb.toString(); } @@ -208,7 +243,9 @@ public boolean equals(Object other) { Objects.equals(this.forceFullDiff, otherJob.forceFullDiff) && Objects.equals(this.totalDiffEntries, otherJob.totalDiffEntries) && Objects.equals(this.disableNativeDiff, otherJob.disableNativeDiff) - && Objects.equals(this.reason, otherJob.reason); + && Objects.equals(this.reason, otherJob.reason) && + Objects.equals(this.subStatus, otherJob.subStatus) && + Objects.equals(this.keysProcessedPct, otherJob.keysProcessedPct); } return false; } @@ -217,7 +254,7 @@ public boolean equals(Object other) { public int hashCode() { return Objects.hash(creationTime, jobId, status, volume, bucket, fromSnapshot, toSnapshot, forceFullDiff, disableNativeDiff, - totalDiffEntries, reason); + totalDiffEntries, reason, subStatus, keysProcessedPct); } public SnapshotDiffJobProto toProtoBuf() { @@ -232,6 +269,8 @@ public SnapshotDiffJobProto toProtoBuf() { .setForceFullDiff(forceFullDiff) .setDisableNativeDiff(disableNativeDiff) .setTotalDiffEntries(totalDiffEntries) + .setSubStatus(subStatus.toProtoBuf()) + .setKeysProcessedPct(keysProcessedPct) .build(); } @@ -247,7 +286,9 @@ public static SnapshotDiffJob getFromProtoBuf( diffJobProto.getToSnapshot(), diffJobProto.getForceFullDiff(), diffJobProto.getDisableNativeDiff(), - diffJobProto.getTotalDiffEntries()); + diffJobProto.getTotalDiffEntries(), + SubStatus.fromProtoBuf(diffJobProto.getSubStatus()), + diffJobProto.getKeysProcessedPct()); } /** diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java index 7e58652b1c49..ece5e72a94ff 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.snapshot; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotDiffResponse.JobStatusProto; /** @@ -44,11 +45,31 @@ public static JobStatus fromProtobuf(JobStatusProto jobStatusProto) { return JobStatus.valueOf(jobStatusProto.name()); } } + /** + * Snapshot diff job sub-status enum. + */ + public enum SubStatus { + SST_FILE_DELTA_DAG_WALK, + SST_FILE_DELTA_FULL_DIFF, + OBJECT_ID_MAP_GEN_OBS, + OBJECT_ID_MAP_GEN_FSO, + DIFF_REPORT_GEN; + + public static SubStatus fromProtoBuf(OzoneManagerProtocolProtos.SnapshotDiffResponse.SubStatus subStatusProto) { + return SubStatus.valueOf(subStatusProto.name()); + } + + public OzoneManagerProtocolProtos.SnapshotDiffResponse.SubStatus toProtoBuf() { + return OzoneManagerProtocolProtos.SnapshotDiffResponse.SubStatus.valueOf(this.name()); + } + } private final SnapshotDiffReportOzone snapshotDiffReport; private final JobStatus jobStatus; private final long waitTimeInMs; private final String reason; + private SubStatus subStatus; + private double progressPercent = 0.0; public SnapshotDiffResponse(final SnapshotDiffReportOzone snapshotDiffReport, final JobStatus jobStatus, @@ -85,6 +106,14 @@ public String getReason() { return reason; } + public void setSubStatus(SubStatus subStatus) { + this.subStatus = subStatus; + } + + public void setProgressPercent(double progressPercent) { + this.progressPercent = progressPercent; + } + @Override public String toString() { StringBuilder str = new StringBuilder(); @@ -112,6 +141,15 @@ public String toString() { .append(". Please retry after ") .append(waitTimeInMs) .append(" ms.\n"); + if (subStatus != null) { + str.append("SubStatus : ") + .append(subStatus); + if (subStatus.equals(SubStatus.OBJECT_ID_MAP_GEN_OBS) || + subStatus.equals(SubStatus.OBJECT_ID_MAP_GEN_FSO)) { + str.append("Keys Processed Estimated Percentage : ") + .append(progressPercent); + } + } } return str.toString(); } diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index df97028a0f31..7c6afa04078d 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -912,6 +912,8 @@ message SnapshotDiffJobProto { optional uint64 totalDiffEntries = 9; optional string message = 10; optional bool disableNativeDiff = 11; + optional SnapshotDiffResponse.SubStatus subStatus = 12; + optional double keysProcessedPct = 13; } message OzoneObj { @@ -2101,10 +2103,19 @@ message SnapshotDiffResponse { CANCELLED = 6; } + enum SubStatus { + SST_FILE_DELTA_DAG_WALK = 1; + SST_FILE_DELTA_FULL_DIFF = 2; + OBJECT_ID_MAP_GEN_OBS = 3; + OBJECT_ID_MAP_GEN_FSO = 4; + DIFF_REPORT_GEN = 5; + } + optional SnapshotDiffReportProto snapshotDiffReport = 1; optional JobStatusProto jobStatus = 2; optional int64 waitTimeInMs = 3; optional string reason = 4; + optional SubStatus subStatus = 5; } message CancelSnapshotDiffResponse { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java index 99f34d863de9..17199debbd47 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java @@ -55,6 +55,11 @@ import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN_PROGRESS; import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.QUEUED; import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.REJECTED; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.DIFF_REPORT_GEN; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.OBJECT_ID_MAP_GEN_FSO; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.OBJECT_ID_MAP_GEN_OBS; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.SST_FILE_DELTA_DAG_WALK; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.SST_FILE_DELTA_FULL_DIFF; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; @@ -83,6 +88,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.stream.Stream; import org.apache.commons.io.file.PathUtils; @@ -489,11 +495,13 @@ public SnapshotDiffResponse getSnapshotDiffReport( fromSnapshotName, toSnapshotName, index, pageSize, forceFullDiff, disableNativeDiff); case IN_PROGRESS: - return new SnapshotDiffResponse( - new SnapshotDiffReportOzone(snapshotRoot.toString(), volumeName, - bucketName, fromSnapshotName, toSnapshotName, new ArrayList<>(), - null), - IN_PROGRESS, defaultWaitTime); + SnapshotDiffResponse response = new SnapshotDiffResponse( + new SnapshotDiffReportOzone(snapshotRoot.toString(), volumeName, bucketName, + fromSnapshotName, toSnapshotName, + new ArrayList<>(), null), IN_PROGRESS, defaultWaitTime); + response.setSubStatus(snapDiffJob.getSubStatus()); + response.setProgressPercent(snapDiffJob.getKeysProcessedPct()); + return response; case FAILED: return new SnapshotDiffResponse( new SnapshotDiffReportOzone(snapshotRoot.toString(), volumeName, @@ -750,8 +758,8 @@ private synchronized SnapshotDiffJob getSnapDiffReportStatus( if (snapDiffJob == null) { String jobId = UUID.randomUUID().toString(); snapDiffJob = new SnapshotDiffJob(System.currentTimeMillis(), jobId, - QUEUED, volumeName, bucketName, fromSnapshotName, toSnapshotName, - forceFullDiff, disableNativeDiff, 0L); + QUEUED, volumeName, bucketName, fromSnapshotName, toSnapshotName, forceFullDiff, + disableNativeDiff, 0L, null, 0.0); snapDiffJobTable.put(jobKey, snapDiffJob); } @@ -905,22 +913,24 @@ void generateSnapshotDiffReport(final String jobKey, // repetition while constantly checking if the job is cancelled. Callable[] methodCalls = new Callable[]{ () -> { + recordActivity(jobKey, OBJECT_ID_MAP_GEN_OBS); getDeltaFilesAndDiffKeysToObjectIdToKeyMap(fsKeyTable, tsKeyTable, fromSnapshot, toSnapshot, fsInfo, tsInfo, useFullDiff, performNonNativeDiff, tablePrefixes, objectIdToKeyNameMapForFromSnapshot, objectIdToKeyNameMapForToSnapshot, objectIdToIsDirMap, - oldParentIds, newParentIds, path.toString()); + oldParentIds, newParentIds, path.toString(), jobKey); return null; }, () -> { if (bucketLayout.isFileSystemOptimized()) { + recordActivity(jobKey, OBJECT_ID_MAP_GEN_FSO); getDeltaFilesAndDiffKeysToObjectIdToKeyMap(fsDirTable, tsDirTable, fromSnapshot, toSnapshot, fsInfo, tsInfo, useFullDiff, performNonNativeDiff, tablePrefixes, objectIdToKeyNameMapForFromSnapshot, objectIdToKeyNameMapForToSnapshot, objectIdToIsDirMap, - oldParentIds, newParentIds, path.toString()); + oldParentIds, newParentIds, path.toString(), jobKey); } return null; }, @@ -943,6 +953,7 @@ void generateSnapshotDiffReport(final String jobKey, return null; }, () -> { + recordActivity(jobKey, DIFF_REPORT_GEN); long totalDiffEntries = generateDiffReport(jobId, fsKeyTable, tsKeyTable, @@ -1019,12 +1030,11 @@ private void getDeltaFilesAndDiffKeysToObjectIdToKeyMap( final PersistentMap objectIdToIsDirMap, final Optional> oldParentIds, final Optional> newParentIds, - final String diffDir) throws IOException, RocksDBException { + final String diffDir, final String jobKey) throws IOException, RocksDBException { List tablesToLookUp = Collections.singletonList(fsTable.getName()); - Set deltaFiles = getDeltaFiles(fromSnapshot, toSnapshot, - tablesToLookUp, fsInfo, tsInfo, useFullDiff, tablePrefixes, diffDir); + tablesToLookUp, fsInfo, tsInfo, useFullDiff, tablePrefixes, diffDir, jobKey); // Workaround to handle deletes if native rocksDb tool for reading // tombstone is not loaded. @@ -1035,10 +1045,13 @@ private void getDeltaFilesAndDiffKeysToObjectIdToKeyMap( RocksDiffUtils.filterRelevantSstFiles(inputFiles, tablePrefixes, fromDB); deltaFiles.addAll(inputFiles); } + if (LOG.isDebugEnabled()) { + LOG.debug("Computed Delta SST File Set, Total count = {} ", deltaFiles.size()); + } addToObjectIdMap(fsTable, tsTable, deltaFiles, !skipNativeDiff && isNativeLibsLoaded, oldObjIdToKeyMap, newObjIdToKeyMap, objectIdToIsDirMap, oldParentIds, - newParentIds, tablePrefixes); + newParentIds, tablePrefixes, jobKey); } @VisibleForTesting @@ -1051,7 +1064,7 @@ void addToObjectIdMap(Table fsTable, PersistentMap objectIdToIsDirMap, Optional> oldParentIds, Optional> newParentIds, - Map tablePrefixes) throws IOException, RocksDBException { + Map tablePrefixes, String jobKey) throws IOException, RocksDBException { if (deltaFiles.isEmpty()) { return; } @@ -1060,8 +1073,12 @@ void addToObjectIdMap(Table fsTable, fsTable.getName().equals(DIRECTORY_TABLE); SstFileSetReader sstFileReader = new SstFileSetReader(deltaFiles); validateEstimatedKeyChangesAreInLimits(sstFileReader); + long totalEstimatedKeysToProcess = sstFileReader.getEstimatedTotalKeys(); String sstFileReaderLowerBound = tablePrefix; String sstFileReaderUpperBound = null; + double stepIncreasePct = 0.1; + double[] checkpoint = new double[1]; + checkpoint[0] = stepIncreasePct; if (Strings.isNotEmpty(tablePrefix)) { char[] upperBoundCharArray = tablePrefix.toCharArray(); upperBoundCharArray[upperBoundCharArray.length - 1] += 1; @@ -1070,13 +1087,22 @@ void addToObjectIdMap(Table fsTable, try (Stream keysToCheck = nativeRocksToolsLoaded ? sstFileReader.getKeyStreamWithTombstone(sstFileReaderLowerBound, sstFileReaderUpperBound) : sstFileReader.getKeyStream(sstFileReaderLowerBound, sstFileReaderUpperBound)) { + AtomicLong keysProcessed = new AtomicLong(0); keysToCheck.forEach(key -> { + if (totalEstimatedKeysToProcess > 0) { + double progressPct = (double) keysProcessed.get() / totalEstimatedKeysToProcess; + if (progressPct >= checkpoint[0]) { + updateProgress(jobKey, progressPct); + checkpoint[0] += stepIncreasePct; + } + } + try { final WithParentObjectId fromObjectId = fsTable.get(key); final WithParentObjectId toObjectId = tsTable.get(key); if (areKeysEqual(fromObjectId, toObjectId) || !isKeyInBucket(key, tablePrefixes, fsTable.getName())) { - // We don't have to do anything. + keysProcessed.getAndIncrement(); return; } if (fromObjectId != null) { @@ -1102,6 +1128,7 @@ void addToObjectIdMap(Table fsTable, newParentIds.ifPresent(set -> set.add(toObjectId .getParentObjectID())); } + keysProcessed.getAndIncrement(); } catch (IOException e) { throw new RuntimeException(e); } @@ -1122,7 +1149,7 @@ Set getDeltaFiles(OmSnapshot fromSnapshot, SnapshotInfo tsInfo, boolean useFullDiff, Map tablePrefixes, - String diffDir) + String diffDir, String jobKey) throws IOException { // TODO: [SNAPSHOT] Refactor the parameter list Optional> deltaFiles = Optional.empty(); @@ -1137,10 +1164,12 @@ Set getDeltaFiles(OmSnapshot fromSnapshot, final DifferSnapshotInfo toDSI = getDSIFromSI(tsInfo, toSnapshot, volume, bucket); + recordActivity(jobKey, SST_FILE_DELTA_DAG_WALK); LOG.debug("Calling RocksDBCheckpointDiffer"); try { deltaFiles = differ.getSSTDiffListWithFullPath(toDSI, fromDSI, diffDir).map(HashSet::new); } catch (Exception exception) { + recordActivity(jobKey, SST_FILE_DELTA_FULL_DIFF); LOG.warn("Failed to get SST diff file using RocksDBCheckpointDiffer. " + "It will fallback to full diff now.", exception); } @@ -1153,6 +1182,7 @@ Set getDeltaFiles(OmSnapshot fromSnapshot, LOG.warn("RocksDBCheckpointDiffer is not available, falling back to" + " slow path"); } + recordActivity(jobKey, SST_FILE_DELTA_FULL_DIFF); ManagedRocksDB fromDB = ((RDBStore)fromSnapshot.getMetadataManager().getStore()) .getDb().getManagedRocksDb(); ManagedRocksDB toDB = ((RDBStore)toSnapshot.getMetadataManager().getStore()) @@ -1500,6 +1530,26 @@ private synchronized void updateJobStatus(String jobKey, snapDiffJobTable.put(jobKey, snapshotDiffJob); } + synchronized void recordActivity(String jobKey, + SnapshotDiffResponse.SubStatus subStatus) { + SnapshotDiffJob snapshotDiffJob = snapDiffJobTable.get(jobKey); + snapshotDiffJob.setSubStatus(subStatus); + snapDiffJobTable.put(jobKey, snapshotDiffJob); + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshot Diff for jobKey = {} transitions to {} state", jobKey, subStatus); + } + } + + synchronized void updateProgress(String jobKey, + double pct) { + SnapshotDiffJob snapshotDiffJob = snapDiffJobTable.get(jobKey); + snapshotDiffJob.setKeysProcessedPct(pct * 100); + snapDiffJobTable.put(jobKey, snapshotDiffJob); + if (LOG.isDebugEnabled()) { + LOG.debug("Completed processing {}% of keys for snapshot diff job {}", pct, jobKey); + } + } + private synchronized void updateJobStatusToFailed(String jobKey, String reason) { SnapshotDiffJob snapshotDiffJob = snapDiffJobTable.get(jobKey); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java index 8e708a4f76fd..5ad98a62754c 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java @@ -297,7 +297,8 @@ private SnapshotDiffJob addJobAndReport(JobStatus jobStatus, String jobKey = fromSnapshot + DELIMITER + toSnapshot; SnapshotDiffJob job = new SnapshotDiffJob(creationTime, jobId, jobStatus, - volume, bucket, fromSnapshot, toSnapshot, false, false, noOfEntries); + volume, bucket, fromSnapshot, toSnapshot, false, false, noOfEntries, + null, 0.0); db.get().put(jobTableCfh, codecRegistry.asRawData(jobKey), codecRegistry.asRawData(job)); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java index 674c00fc4f41..793d87296f15 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java @@ -56,6 +56,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.anyDouble; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyList; import static org.mockito.Mockito.anyMap; @@ -303,7 +304,8 @@ public void init() throws RocksDBException, IOException, ExecutionException { SnapshotDiffJob diffJob = new SnapshotDiffJob(System.currentTimeMillis(), UUID.randomUUID().toString(), jobStatus, VOLUME_NAME, BUCKET_NAME, - baseSnapshotName, targetSnapshotName, false, false, 0); + baseSnapshotName, targetSnapshotName, false, false, 0, + null, 0.0); snapshotNames.add(targetSnapshotName); snapshotInfoList.add(targetSnapshot); @@ -422,6 +424,7 @@ public void testGetDeltaFilesWithDag(int numberOfFiles) throws IOException { .thenReturn(getSnapshotInfoInstance(VOLUME_NAME, BUCKET_NAME, snap2.toString(), snap2)); String diffDir = snapDiffDir.getAbsolutePath(); + String diffJobKey = snap1 + DELIMITER + snap2; Set randomStrings = IntStream.range(0, numberOfFiles) .mapToObj(i -> RandomStringUtils.randomAlphabetic(10)) .collect(Collectors.toSet()); @@ -448,12 +451,15 @@ public void testGetDeltaFilesWithDag(int numberOfFiles) throws IOException { mockedRdbUtil.when(() -> RdbUtil.getSSTFilesForComparison(any(), any())) .thenReturn(Collections.singleton(RandomStringUtils.randomAlphabetic(10))); mockedRocksDiffUtils.when(() -> RocksDiffUtils.filterRelevantSstFiles(any(), any())).thenAnswer(i -> null); - Set deltaFiles = snapshotDiffManager.getDeltaFiles( + SnapshotDiffManager spy = spy(snapshotDiffManager); + doNothing().when(spy).recordActivity(any(), any()); + doNothing().when(spy).updateProgress(anyString(), anyDouble()); + Set deltaFiles = spy.getDeltaFiles( fromSnapshot, toSnapshot, Arrays.asList("cf1", "cf2"), fromSnapshotInfo, toSnapshotInfo, false, - Collections.emptyMap(), diffDir); + Collections.emptyMap(), diffDir, diffJobKey); assertEquals(randomStrings, deltaFiles); } rcFromSnapshot.close(); @@ -496,6 +502,7 @@ public void testGetDeltaFilesWithFullDiff(int numberOfFiles, }); UUID snap1 = UUID.randomUUID(); UUID snap2 = UUID.randomUUID(); + String diffJobKey = snap1 + DELIMITER + snap2; when(snapshotInfoTable.get(SnapshotInfo.getTableKey(VOLUME_NAME, BUCKET_NAME, snap1.toString()))) .thenReturn(getSnapshotInfoInstance(VOLUME_NAME, BUCKET_NAME, snap1.toString(), snap2)); when(snapshotInfoTable.get(SnapshotInfo.getTableKey(VOLUME_NAME, BUCKET_NAME, snap2.toString()))) @@ -518,7 +525,10 @@ public void testGetDeltaFilesWithFullDiff(int numberOfFiles, SnapshotInfo fromSnapshotInfo = getMockedSnapshotInfo(snap1); SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap1); when(jobTableIterator.isValid()).thenReturn(false); - Set deltaFiles = snapshotDiffManager.getDeltaFiles( + SnapshotDiffManager spy = spy(snapshotDiffManager); + doNothing().when(spy).recordActivity(any(), any()); + doNothing().when(spy).updateProgress(anyString(), anyDouble()); + Set deltaFiles = spy.getDeltaFiles( fromSnapshot, toSnapshot, Arrays.asList("cf1", "cf2"), @@ -526,7 +536,7 @@ public void testGetDeltaFilesWithFullDiff(int numberOfFiles, toSnapshotInfo, false, Collections.emptyMap(), - snapDiffDir.getAbsolutePath()); + snapDiffDir.getAbsolutePath(), diffJobKey); assertEquals(deltaStrings, deltaFiles); } } @@ -586,7 +596,11 @@ public void testGetDeltaFilesWithDifferThrowException(int numberOfFiles) SnapshotInfo fromSnapshotInfo = getMockedSnapshotInfo(snap1); SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap1); when(jobTableIterator.isValid()).thenReturn(false); - Set deltaFiles = snapshotDiffManager.getDeltaFiles( + String diffJobKey = snap1 + DELIMITER + snap2; + SnapshotDiffManager spy = spy(snapshotDiffManager); + doNothing().when(spy).recordActivity(any(), any()); + doNothing().when(spy).updateProgress(anyString(), anyDouble()); + Set deltaFiles = spy.getDeltaFiles( fromSnapshot, toSnapshot, Arrays.asList("cf1", "cf2"), @@ -594,7 +608,7 @@ public void testGetDeltaFilesWithDifferThrowException(int numberOfFiles) toSnapshotInfo, false, Collections.emptyMap(), - snapDiffDir.getAbsolutePath()); + snapDiffDir.getAbsolutePath(), diffJobKey); assertEquals(deltaStrings, deltaFiles); rcFromSnapshot.close(); @@ -704,9 +718,8 @@ public void testObjectIdMapWithTombstoneEntries(boolean nativeLibraryLoaded, nativeLibraryLoaded, oldObjectIdKeyMap, newObjectIdKeyMap, objectIdsToCheck, Optional.of(oldParentIds), Optional.of(newParentIds), - ImmutableMap.of(OmMetadataManagerImpl.DIRECTORY_TABLE, "", - OmMetadataManagerImpl.KEY_TABLE, "", - OmMetadataManagerImpl.FILE_TABLE, "")); + ImmutableMap.of(OmMetadataManagerImpl.DIRECTORY_TABLE, "", OmMetadataManagerImpl.KEY_TABLE, "", + OmMetadataManagerImpl.FILE_TABLE, ""), ""); try (ClosableIterator> oldObjectIdIter = oldObjectIdKeyMap.iterator()) { @@ -857,7 +870,7 @@ public void testGenerateDiffReport() throws IOException { assertEquals(100, totalDiffEntries); SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, "jobId", JobStatus.DONE, "vol", "buck", "fs", "ts", false, - true, diffMap.size()); + true, diffMap.size(), null, 0.0); SnapshotDiffReportOzone snapshotDiffReportOzone = snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol", "buck", "fs", "ts", @@ -918,11 +931,11 @@ public void testCreatePageResponse(int startIdx, SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, testJobId, JobStatus.DONE, "vol", "buck", "fs", "ts", false, - true, totalNumberOfRecords); + true, totalNumberOfRecords, null, 0.0); SnapshotDiffJob snapshotDiffJob2 = new SnapshotDiffJob(0, testJobId2, JobStatus.DONE, "vol", "buck", "fs", "ts", false, - true, totalNumberOfRecords); + true, totalNumberOfRecords, null, 0.0); db.get().put(snapDiffJobTable, codecRegistry.asRawData(testJobId), @@ -1066,8 +1079,8 @@ public void testSnapshotDiffCancelFailure(JobStatus jobStatus, String jobId = UUID.randomUUID().toString(); SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0L, - jobId, jobStatus, volumeName, bucketName, - fromSnapshotName, toSnapshotName, true, false, 10); + jobId, jobStatus, volumeName, bucketName, fromSnapshotName, toSnapshotName, + true, false, 10, null, 0.0); snapDiffJobMap.put(diffJobKey, snapshotDiffJob); @@ -1524,8 +1537,10 @@ private void setupMocksForRunningASnapDiff( @Test public void testGetDeltaFilesWithFullDiff() throws IOException { SnapshotDiffManager spy = spy(snapshotDiffManager); - OmSnapshot fromSnapshot = getMockedOmSnapshot(UUID.randomUUID()); - OmSnapshot toSnapshot = getMockedOmSnapshot(UUID.randomUUID()); + UUID snap1 = UUID.randomUUID(); + OmSnapshot fromSnapshot = getMockedOmSnapshot(snap1); + UUID snap2 = UUID.randomUUID(); + OmSnapshot toSnapshot = getMockedOmSnapshot(snap2); Mockito.doAnswer(invocation -> { OmSnapshot snapshot = invocation.getArgument(0); if (snapshot == fromSnapshot) { @@ -1537,8 +1552,11 @@ public void testGetDeltaFilesWithFullDiff() throws IOException { return Sets.newHashSet("6", "7", "8"); }).when(spy).getSSTFileListForSnapshot(Mockito.any(OmSnapshot.class), Mockito.anyList()); + doNothing().when(spy).recordActivity(any(), any()); + doNothing().when(spy).updateProgress(anyString(), anyDouble()); + String diffJobKey = snap1 + DELIMITER + snap2; Set deltaFiles = spy.getDeltaFiles(fromSnapshot, toSnapshot, Collections.emptyList(), snapshotInfo, - snapshotInfo, true, Collections.emptyMap(), null); + snapshotInfo, true, Collections.emptyMap(), null, diffJobKey); Assertions.assertEquals(Sets.newHashSet("1", "2", "3", "4", "5"), deltaFiles); } @@ -1553,13 +1571,14 @@ public void testGetSnapshotDiffReportHappyCase() throws Exception { doReturn(testDeltaFiles).when(spy).getDeltaFiles(any(OmSnapshot.class), any(OmSnapshot.class), anyList(), eq(fromSnapInfo), eq(toSnapInfo), - eq(false), anyMap(), anyString()); + eq(false), anyMap(), anyString(), + anyString()); doReturn(testDeltaFiles).when(spy) .getSSTFileListForSnapshot(any(OmSnapshot.class), anyList()); doNothing().when(spy).addToObjectIdMap(eq(keyInfoTable), eq(keyInfoTable), - any(), anyBoolean(), any(), any(), any(), any(), any(), anyMap()); + any(), anyBoolean(), any(), any(), any(), any(), any(), anyMap(), anyString()); doNothing().when(spy).checkReportsIntegrity(any(), anyInt(), anyInt()); doReturn(10L).when(spy).generateDiffReport(anyString(),