From 9fdb2804904e67b98e9d1a6c7a79423fa0d90220 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Wed, 5 Mar 2025 16:01:25 +0530 Subject: [PATCH 1/6] [Snapshot] HDDS-8007. Add more detailed stages for SnapDiff job progress tracking. --- .../ozone/om/helpers/SnapshotDiffJob.java | 29 ++++++++-- .../ozone/snapshot/SnapshotDiffResponse.java | 19 +++++++ .../src/main/proto/OmClientProtocol.proto | 1 + .../om/snapshot/SnapshotDiffManager.java | 54 +++++++++++++++---- .../TestSnapshotDiffCleanupService.java | 3 +- .../om/snapshot/TestSnapshotDiffManager.java | 15 +++--- 6 files changed, 98 insertions(+), 23 deletions(-) diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java index c3c8efc11ad6..54d5cda9bda8 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; +import java.util.List; import java.util.Objects; import org.apache.commons.lang3.StringUtils; @@ -55,6 +56,9 @@ public static Codec getCodec() { // is FAILED. private String reason; + // Represents current status of the job if the job is in IN_PROGRESS state. + private List activities; + // Default constructor for Jackson Serializer. public SnapshotDiffJob() { @@ -70,7 +74,8 @@ public SnapshotDiffJob(long creationTime, String toSnapshot, boolean forceFullDiff, boolean disableNativeDiff, - long totalDiffEntries) { + long totalDiffEntries, + List activities) { this.creationTime = creationTime; this.jobId = jobId; this.status = jobStatus; @@ -82,6 +87,7 @@ public SnapshotDiffJob(long creationTime, this.disableNativeDiff = disableNativeDiff; this.totalDiffEntries = totalDiffEntries; this.reason = StringUtils.EMPTY; + this.activities = activities; } public String getJobId() { @@ -172,6 +178,18 @@ public void disableNativeDiff(boolean disableNativeDiffVal) { this.disableNativeDiff = disableNativeDiffVal; } + public List getActivities() { + return activities; + } + + public void setActivities(List activities) { + this.activities = activities; + } + + public void addActivity(String activity){ + this.activities.add(activity); + } + @Override public String toString() { StringBuilder sb = new StringBuilder("creationTime : ").append(creationTime) @@ -209,7 +227,8 @@ public boolean equals(Object other) { Objects.equals(this.forceFullDiff, otherJob.forceFullDiff) && Objects.equals(this.totalDiffEntries, otherJob.totalDiffEntries) && Objects.equals(this.disableNativeDiff, otherJob.disableNativeDiff) - && Objects.equals(this.reason, otherJob.reason); + && Objects.equals(this.reason, otherJob.reason) && + Objects.equals(this.activities, otherJob.activities); } return false; } @@ -218,7 +237,7 @@ public boolean equals(Object other) { public int hashCode() { return Objects.hash(creationTime, jobId, status, volume, bucket, fromSnapshot, toSnapshot, forceFullDiff, disableNativeDiff, - totalDiffEntries, reason); + totalDiffEntries, reason, activities); } public SnapshotDiffJobProto toProtoBuf() { @@ -233,6 +252,7 @@ public SnapshotDiffJobProto toProtoBuf() { .setForceFullDiff(forceFullDiff) .setDisableNativeDiff(disableNativeDiff) .setTotalDiffEntries(totalDiffEntries) + .addAllActivities(activities) .build(); } @@ -248,7 +268,8 @@ public static SnapshotDiffJob getFromProtoBuf( diffJobProto.getToSnapshot(), diffJobProto.getForceFullDiff(), diffJobProto.getDisableNativeDiff(), - diffJobProto.getTotalDiffEntries()); + diffJobProto.getTotalDiffEntries(), + diffJobProto.getActivitiesList()); } /** diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java index b4700eb83908..c1f11c39d7cb 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java @@ -19,6 +19,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotDiffResponse.JobStatusProto; +import java.util.List; /** * POJO for Snapshot Diff Response. @@ -49,6 +50,7 @@ public static JobStatus fromProtobuf(JobStatusProto jobStatusProto) { private final JobStatus jobStatus; private final long waitTimeInMs; private final String reason; + private List activities; public SnapshotDiffResponse(final SnapshotDiffReportOzone snapshotDiffReport, final JobStatus jobStatus, @@ -85,6 +87,10 @@ public String getReason() { return reason; } + public void setActivities(List activities) { + this.activities = activities; + } + @Override public String toString() { StringBuilder str = new StringBuilder(); @@ -112,6 +118,19 @@ public String toString() { .append(". Please retry after ") .append(waitTimeInMs) .append(" ms.\n"); + if (activities != null && !activities.isEmpty()) { + str.append("Activities: ["); + for (int i = 0; i < activities.size(); i++) { + String activity = activities.get(i); + str.append("{") + .append(", op : ").append(activity) + .append("}"); + if (i < activities.size() - 1) { + str.append(", "); // Add comma except for the last element + } + } + str.append("]"); + } } return str.toString(); } diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index 92c2b6b4cc5a..891ae47fa5f9 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -912,6 +912,7 @@ message SnapshotDiffJobProto { optional uint64 totalDiffEntries = 9; optional string message = 10; optional bool disableNativeDiff = 11; + repeated string activities = 12; } message OzoneObj { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java index 8add87f0633a..c38aaca9e8c7 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java @@ -88,6 +88,8 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.Date; import java.util.function.BiFunction; import java.util.stream.Stream; @@ -491,11 +493,12 @@ public SnapshotDiffResponse getSnapshotDiffReport( fromSnapshotName, toSnapshotName, index, pageSize, forceFullDiff, disableNativeDiff); case IN_PROGRESS: - return new SnapshotDiffResponse( + SnapshotDiffResponse response = new SnapshotDiffResponse( new SnapshotDiffReportOzone(snapshotRoot.toString(), volumeName, bucketName, fromSnapshotName, toSnapshotName, new ArrayList<>(), - null), - IN_PROGRESS, defaultWaitTime); + null), IN_PROGRESS, defaultWaitTime); + response.setActivities(snapDiffJob.getActivities()); + return response; case FAILED: return new SnapshotDiffResponse( new SnapshotDiffReportOzone(snapshotRoot.toString(), volumeName, @@ -753,7 +756,7 @@ private synchronized SnapshotDiffJob getSnapDiffReportStatus( String jobId = UUID.randomUUID().toString(); snapDiffJob = new SnapshotDiffJob(System.currentTimeMillis(), jobId, QUEUED, volumeName, bucketName, fromSnapshotName, toSnapshotName, - forceFullDiff, disableNativeDiff, 0L); + forceFullDiff, disableNativeDiff, 0L, Collections.EMPTY_LIST); snapDiffJobTable.put(jobKey, snapDiffJob); } @@ -907,22 +910,26 @@ void generateSnapshotDiffReport(final String jobKey, // repetition while constantly checking if the job is cancelled. Callable[] methodCalls = new Callable[]{ () -> { + recordActivity(jobKey, "Candidate Key Generation :" + + " Generating the ObjectId-Key Map"); getDeltaFilesAndDiffKeysToObjectIdToKeyMap(fsKeyTable, tsKeyTable, fromSnapshot, toSnapshot, fsInfo, tsInfo, useFullDiff, performNonNativeDiff, tablePrefixes, objectIdToKeyNameMapForFromSnapshot, objectIdToKeyNameMapForToSnapshot, objectIdToIsDirMap, - oldParentIds, newParentIds, path.toString()); + oldParentIds, newParentIds, path.toString(), jobKey); return null; }, () -> { if (bucketLayout.isFileSystemOptimized()) { + recordActivity(jobKey, "Candidate Key Generation : " + + "Generating the ObjectId-Key Map for Directory Keys"); getDeltaFilesAndDiffKeysToObjectIdToKeyMap(fsDirTable, tsDirTable, fromSnapshot, toSnapshot, fsInfo, tsInfo, useFullDiff, performNonNativeDiff, tablePrefixes, objectIdToKeyNameMapForFromSnapshot, objectIdToKeyNameMapForToSnapshot, objectIdToIsDirMap, - oldParentIds, newParentIds, path.toString()); + oldParentIds, newParentIds, path.toString(), jobKey); } return null; }, @@ -945,6 +952,8 @@ void generateSnapshotDiffReport(final String jobKey, return null; }, () -> { + recordActivity(jobKey, "Final Stage : " + + "Generating and storing the Diff Report"); long totalDiffEntries = generateDiffReport(jobId, fsKeyTable, tsKeyTable, @@ -1021,10 +1030,10 @@ private void getDeltaFilesAndDiffKeysToObjectIdToKeyMap( final PersistentMap objectIdToIsDirMap, final Optional> oldParentIds, final Optional> newParentIds, - final String diffDir) throws IOException, RocksDBException { + final String diffDir, final String jobKey) throws IOException, RocksDBException { List tablesToLookUp = Collections.singletonList(fsTable.getName()); - + recordActivity(jobKey, "Computing Delta SST File Set"); Set deltaFiles = getDeltaFiles(fromSnapshot, toSnapshot, tablesToLookUp, fsInfo, tsInfo, useFullDiff, tablePrefixes, diffDir); @@ -1037,10 +1046,12 @@ private void getDeltaFilesAndDiffKeysToObjectIdToKeyMap( RocksDiffUtils.filterRelevantSstFiles(inputFiles, tablePrefixes, fromDB); deltaFiles.addAll(inputFiles); } + recordActivity(jobKey, "Computed Delta SST File Set, Total count = " + + deltaFiles.size()); addToObjectIdMap(fsTable, tsTable, deltaFiles, !skipNativeDiff && isNativeLibsLoaded, oldObjIdToKeyMap, newObjIdToKeyMap, objectIdToIsDirMap, oldParentIds, - newParentIds, tablePrefixes); + newParentIds, tablePrefixes, jobKey); } @VisibleForTesting @@ -1053,7 +1064,7 @@ void addToObjectIdMap(Table fsTable, PersistentMap objectIdToIsDirMap, Optional> oldParentIds, Optional> newParentIds, - Map tablePrefixes) throws IOException, RocksDBException { + Map tablePrefixes, String jobKey) throws IOException, RocksDBException { if (deltaFiles.isEmpty()) { return; } @@ -1064,6 +1075,7 @@ void addToObjectIdMap(Table fsTable, validateEstimatedKeyChangesAreInLimits(sstFileReader); String sstFileReaderLowerBound = tablePrefix; String sstFileReaderUpperBound = null; + int stepIncrease = 100; if (Strings.isNotEmpty(tablePrefix)) { char[] upperBoundCharArray = tablePrefix.toCharArray(); upperBoundCharArray[upperBoundCharArray.length - 1] += 1; @@ -1072,13 +1084,19 @@ void addToObjectIdMap(Table fsTable, try (Stream keysToCheck = nativeRocksToolsLoaded ? sstFileReader.getKeyStreamWithTombstone(sstFileReaderLowerBound, sstFileReaderUpperBound) : sstFileReader.getKeyStream(sstFileReaderLowerBound, sstFileReaderUpperBound)) { + AtomicLong keysProcessed = new AtomicLong(0); keysToCheck.forEach(key -> { + if (keysProcessed.get() % stepIncrease == 0) { + String status = "Completed processing " + keysProcessed + " number of keys"; + recordActivity(jobKey, status); + } + try { final WithParentObjectId fromObjectId = fsTable.get(key); final WithParentObjectId toObjectId = tsTable.get(key); if (areKeysEqual(fromObjectId, toObjectId) || !isKeyInBucket(key, tablePrefixes, fsTable.getName())) { - // We don't have to do anything. + keysProcessed.getAndIncrement(); return; } if (fromObjectId != null) { @@ -1104,6 +1122,7 @@ void addToObjectIdMap(Table fsTable, newParentIds.ifPresent(set -> set.add(toObjectId .getParentObjectID())); } + keysProcessed.getAndIncrement(); } catch (IOException e) { throw new RuntimeException(e); } @@ -1524,6 +1543,19 @@ private synchronized void updateJobStatus(String jobKey, snapDiffJobTable.put(jobKey, snapshotDiffJob); } + private synchronized void recordActivity(String jobKey, + String statusText) { + SnapshotDiffJob snapshotDiffJob = snapDiffJobTable.get(jobKey); + String timestamp = getTimeStampString() + ""; + snapshotDiffJob.addActivity(timestamp + " : " + statusText); + snapDiffJobTable.put(jobKey, snapshotDiffJob); + } + + private static String getTimeStampString() { + Date date = new Date(System.currentTimeMillis()); + return date.toString(); + } + private synchronized void updateJobStatusToFailed(String jobKey, String reason) { SnapshotDiffJob snapshotDiffJob = snapDiffJobTable.get(jobKey); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java index dfbc6a3b9442..04b4690f746c 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java @@ -299,7 +299,8 @@ private SnapshotDiffJob addJobAndReport(JobStatus jobStatus, String jobKey = fromSnapshot + DELIMITER + toSnapshot; SnapshotDiffJob job = new SnapshotDiffJob(creationTime, jobId, jobStatus, - volume, bucket, fromSnapshot, toSnapshot, false, false, noOfEntries); + volume, bucket, fromSnapshot, toSnapshot, false, false, noOfEntries, + new ArrayList<>()); db.get().put(jobTableCfh, codecRegistry.asRawData(jobKey), codecRegistry.asRawData(job)); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java index 037e54d00085..20e47aad735d 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java @@ -303,7 +303,8 @@ public void init() throws RocksDBException, IOException, ExecutionException { SnapshotDiffJob diffJob = new SnapshotDiffJob(System.currentTimeMillis(), UUID.randomUUID().toString(), jobStatus, VOLUME_NAME, BUCKET_NAME, - baseSnapshotName, targetSnapshotName, false, false, 0); + baseSnapshotName, targetSnapshotName, false, false, 0, + new ArrayList<>()); snapshotNames.add(targetSnapshotName); snapshotInfoList.add(targetSnapshot); @@ -705,7 +706,7 @@ public void testObjectIdMapWithTombstoneEntries(boolean nativeLibraryLoaded, Optional.of(newParentIds), ImmutableMap.of(OmMetadataManagerImpl.DIRECTORY_TABLE, "", OmMetadataManagerImpl.KEY_TABLE, "", - OmMetadataManagerImpl.FILE_TABLE, "")); + OmMetadataManagerImpl.FILE_TABLE, ""),""); try (ClosableIterator> oldObjectIdIter = oldObjectIdKeyMap.iterator()) { @@ -856,7 +857,7 @@ public void testGenerateDiffReport() throws IOException { assertEquals(100, totalDiffEntries); SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, "jobId", JobStatus.DONE, "vol", "buck", "fs", "ts", false, - true, diffMap.size()); + true, diffMap.size(), new ArrayList<>()); SnapshotDiffReportOzone snapshotDiffReportOzone = snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol", "buck", "fs", "ts", @@ -917,11 +918,11 @@ public void testCreatePageResponse(int startIdx, SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, testJobId, JobStatus.DONE, "vol", "buck", "fs", "ts", false, - true, totalNumberOfRecords); + true, totalNumberOfRecords, new ArrayList<>()); SnapshotDiffJob snapshotDiffJob2 = new SnapshotDiffJob(0, testJobId2, JobStatus.DONE, "vol", "buck", "fs", "ts", false, - true, totalNumberOfRecords); + true, totalNumberOfRecords, new ArrayList<>()); db.get().put(snapDiffJobTable, codecRegistry.asRawData(testJobId), @@ -1066,7 +1067,7 @@ public void testSnapshotDiffCancelFailure(JobStatus jobStatus, String jobId = UUID.randomUUID().toString(); SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0L, jobId, jobStatus, volumeName, bucketName, - fromSnapshotName, toSnapshotName, true, false, 10); + fromSnapshotName, toSnapshotName, true, false, 10, new ArrayList<>()); snapDiffJobMap.put(diffJobKey, snapshotDiffJob); @@ -1558,7 +1559,7 @@ public void testGetSnapshotDiffReportHappyCase() throws Exception { .getSSTFileListForSnapshot(any(OmSnapshot.class), anyList()); doNothing().when(spy).addToObjectIdMap(eq(keyInfoTable), eq(keyInfoTable), - any(), anyBoolean(), any(), any(), any(), any(), any(), anyMap()); + any(), anyBoolean(), any(), any(), any(), any(), any(), anyMap(),anyString()); doNothing().when(spy).checkReportsIntegrity(any(), anyInt(), anyInt()); doReturn(10L).when(spy).generateDiffReport(anyString(), From 27e751a274583f6e097543ae6acbef5167ba0023 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Mon, 10 Mar 2025 19:16:24 +0530 Subject: [PATCH 2/6] use enums --- .../ozone/om/helpers/SnapshotDiffJob.java | 50 +++++++++----- .../ozone/snapshot/SnapshotDiffResponse.java | 47 ++++++++----- .../src/main/proto/OmClientProtocol.proto | 12 +++- .../om/snapshot/SnapshotDiffManager.java | 67 ++++++++++++------- .../TestSnapshotDiffCleanupService.java | 2 +- .../om/snapshot/TestSnapshotDiffManager.java | 30 +++++---- 6 files changed, 138 insertions(+), 70 deletions(-) diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java index edf30bcc20e9..44856e757620 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java @@ -21,12 +21,12 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; -import java.util.List; import java.util.Objects; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.utils.db.Codec; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotDiffJobProto; import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus; +import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus; /** * POJO for Snapshot diff job. @@ -55,8 +55,12 @@ public static Codec getCodec() { // is FAILED. private String reason; - // Represents current status of the job if the job is in IN_PROGRESS state. - private List activities; + // Represents current sub-status of the job if the job is in IN_PROGRESS state. + private SubStatus subStatus; + // percentage of keys processed in the Object-ID generation phase. + // This is the most time-consuming phase as it loads both snapshots + // and reads from it populating the ObjectID-key map. + double keysProcessedPct; // Default constructor for Jackson Serializer. public SnapshotDiffJob() { @@ -74,7 +78,8 @@ public SnapshotDiffJob(long creationTime, boolean forceFullDiff, boolean disableNativeDiff, long totalDiffEntries, - List activities) { + SubStatus subStatus, + double keysProcessedPct) { this.creationTime = creationTime; this.jobId = jobId; this.status = jobStatus; @@ -86,7 +91,8 @@ public SnapshotDiffJob(long creationTime, this.disableNativeDiff = disableNativeDiff; this.totalDiffEntries = totalDiffEntries; this.reason = StringUtils.EMPTY; - this.activities = activities; + this.subStatus = subStatus; + this.keysProcessedPct = keysProcessedPct; } public String getJobId() { @@ -177,16 +183,20 @@ public void disableNativeDiff(boolean disableNativeDiffVal) { this.disableNativeDiff = disableNativeDiffVal; } - public List getActivities() { - return activities; + public SubStatus getSubStatus() { + return subStatus; } - public void setActivities(List activities) { - this.activities = activities; + public void setSubStatus(SubStatus subStatus) { + this.subStatus = subStatus; } - public void addActivity(String activity){ - this.activities.add(activity); + public double getKeysProcessedPct() { + return keysProcessedPct; + } + + public void setKeysProcessedPct(double keysProcessedPct) { + this.keysProcessedPct = keysProcessedPct; } @Override @@ -205,6 +215,13 @@ public String toString() { if (StringUtils.isNotEmpty(reason)) { sb.append(", reason: ").append(reason); } + if (status.equals(JobStatus.IN_PROGRESS) && subStatus != null) { + sb.append(", subStatus: ").append(status); + if (subStatus.equals(SubStatus.OBJECT_ID_MAP_GEN_FSO) + || subStatus.equals(SubStatus.OBJECT_ID_MAP_GEN_OBS)){ + sb.append(", keysProcessedPercent"); + } + } return sb.toString(); } @@ -227,7 +244,8 @@ public boolean equals(Object other) { Objects.equals(this.totalDiffEntries, otherJob.totalDiffEntries) && Objects.equals(this.disableNativeDiff, otherJob.disableNativeDiff) && Objects.equals(this.reason, otherJob.reason) && - Objects.equals(this.activities, otherJob.activities); + Objects.equals(this.subStatus, otherJob.subStatus) && + Objects.equals(this.keysProcessedPct, otherJob.keysProcessedPct); } return false; } @@ -236,7 +254,7 @@ public boolean equals(Object other) { public int hashCode() { return Objects.hash(creationTime, jobId, status, volume, bucket, fromSnapshot, toSnapshot, forceFullDiff, disableNativeDiff, - totalDiffEntries, reason, activities); + totalDiffEntries, reason, subStatus, keysProcessedPct); } public SnapshotDiffJobProto toProtoBuf() { @@ -251,7 +269,8 @@ public SnapshotDiffJobProto toProtoBuf() { .setForceFullDiff(forceFullDiff) .setDisableNativeDiff(disableNativeDiff) .setTotalDiffEntries(totalDiffEntries) - .addAllActivities(activities) + .setSubStatus(subStatus.toProtoBuf()) + .setKeysProcessedPct(keysProcessedPct) .build(); } @@ -268,7 +287,8 @@ public static SnapshotDiffJob getFromProtoBuf( diffJobProto.getForceFullDiff(), diffJobProto.getDisableNativeDiff(), diffJobProto.getTotalDiffEntries(), - diffJobProto.getActivitiesList()); + SubStatus.fromProtoBuf(diffJobProto.getSubStatus()), + diffJobProto.getKeysProcessedPct()); } /** diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java index 81c1e74d5335..4bf2ce03c599 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java @@ -18,8 +18,8 @@ package org.apache.hadoop.ozone.snapshot; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotDiffResponse.JobStatusProto; -import java.util.List; /** * POJO for Snapshot Diff Response. @@ -46,11 +46,28 @@ public static JobStatus fromProtobuf(JobStatusProto jobStatusProto) { } } + public enum SubStatus { + SST_FILE_DELTA_DAG_WALK, + SST_FILE_DELTA_FULL_DIFF, + OBJECT_ID_MAP_GEN_OBS, + OBJECT_ID_MAP_GEN_FSO, + DIFF_REPORT_GEN; + + public static SubStatus fromProtoBuf(OzoneManagerProtocolProtos.SnapshotDiffResponse.SubStatus subStatusProto) { + return SubStatus.valueOf(subStatusProto.name()); + } + + public OzoneManagerProtocolProtos.SnapshotDiffResponse.SubStatus toProtoBuf() { + return OzoneManagerProtocolProtos.SnapshotDiffResponse.SubStatus.valueOf(this.name()); + } + } + private final SnapshotDiffReportOzone snapshotDiffReport; private final JobStatus jobStatus; private final long waitTimeInMs; private final String reason; - private List activities; + private SubStatus subStatus; + private double progressPercent = 0.0; public SnapshotDiffResponse(final SnapshotDiffReportOzone snapshotDiffReport, final JobStatus jobStatus, @@ -87,8 +104,12 @@ public String getReason() { return reason; } - public void setActivities(List activities) { - this.activities = activities; + public void setSubStatus(SubStatus subStatus) { + this.subStatus = subStatus; + } + + public void setProgressPercent(double progressPercent) { + this.progressPercent = progressPercent; } @Override @@ -118,18 +139,14 @@ public String toString() { .append(". Please retry after ") .append(waitTimeInMs) .append(" ms.\n"); - if (activities != null && !activities.isEmpty()) { - str.append("Activities: ["); - for (int i = 0; i < activities.size(); i++) { - String activity = activities.get(i); - str.append("{") - .append(", op : ").append(activity) - .append("}"); - if (i < activities.size() - 1) { - str.append(", "); // Add comma except for the last element - } + if (subStatus != null) { + str.append("SubStatus is ") + .append(subStatus); + if (subStatus.equals(SubStatus.OBJECT_ID_MAP_GEN_OBS) || + subStatus.equals(SubStatus.OBJECT_ID_MAP_GEN_FSO)) { + str.append("Keys Processed Estimated Percentage : ") + .append(progressPercent); } - str.append("]"); } } return str.toString(); diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index cff90eede113..7c6afa04078d 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -912,7 +912,8 @@ message SnapshotDiffJobProto { optional uint64 totalDiffEntries = 9; optional string message = 10; optional bool disableNativeDiff = 11; - repeated string activities = 12; + optional SnapshotDiffResponse.SubStatus subStatus = 12; + optional double keysProcessedPct = 13; } message OzoneObj { @@ -2102,10 +2103,19 @@ message SnapshotDiffResponse { CANCELLED = 6; } + enum SubStatus { + SST_FILE_DELTA_DAG_WALK = 1; + SST_FILE_DELTA_FULL_DIFF = 2; + OBJECT_ID_MAP_GEN_OBS = 3; + OBJECT_ID_MAP_GEN_FSO = 4; + DIFF_REPORT_GEN = 5; + } + optional SnapshotDiffReportProto snapshotDiffReport = 1; optional JobStatusProto jobStatus = 2; optional int64 waitTimeInMs = 3; optional string reason = 4; + optional SubStatus subStatus = 5; } message CancelSnapshotDiffResponse { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java index 8d2618d79ce0..1e911b6577de 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java @@ -55,6 +55,7 @@ import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN_PROGRESS; import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.QUEUED; import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.REJECTED; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; @@ -492,10 +493,11 @@ public SnapshotDiffResponse getSnapshotDiffReport( disableNativeDiff); case IN_PROGRESS: SnapshotDiffResponse response = new SnapshotDiffResponse( - new SnapshotDiffReportOzone(snapshotRoot.toString(), volumeName, - bucketName, fromSnapshotName, toSnapshotName, new ArrayList<>(), - null), IN_PROGRESS, defaultWaitTime); - response.setActivities(snapDiffJob.getActivities()); + new SnapshotDiffReportOzone(snapshotRoot.toString(), volumeName, bucketName, + fromSnapshotName, toSnapshotName, + new ArrayList<>(), null), IN_PROGRESS, defaultWaitTime); + response.setSubStatus(snapDiffJob.getSubStatus()); + response.setProgressPercent(snapDiffJob.getKeysProcessedPct()); return response; case FAILED: return new SnapshotDiffResponse( @@ -754,7 +756,7 @@ private synchronized SnapshotDiffJob getSnapDiffReportStatus( String jobId = UUID.randomUUID().toString(); snapDiffJob = new SnapshotDiffJob(System.currentTimeMillis(), jobId, QUEUED, volumeName, bucketName, fromSnapshotName, toSnapshotName, - forceFullDiff, disableNativeDiff, 0L, Collections.EMPTY_LIST); + forceFullDiff, disableNativeDiff, 0L, null,0.0); snapDiffJobTable.put(jobKey, snapDiffJob); } @@ -908,8 +910,7 @@ void generateSnapshotDiffReport(final String jobKey, // repetition while constantly checking if the job is cancelled. Callable[] methodCalls = new Callable[]{ () -> { - recordActivity(jobKey, "Candidate Key Generation :" + - " Generating the ObjectId-Key Map"); + recordActivity(jobKey, OBJECT_ID_MAP_GEN_OBS); getDeltaFilesAndDiffKeysToObjectIdToKeyMap(fsKeyTable, tsKeyTable, fromSnapshot, toSnapshot, fsInfo, tsInfo, useFullDiff, performNonNativeDiff, tablePrefixes, @@ -920,8 +921,7 @@ void generateSnapshotDiffReport(final String jobKey, }, () -> { if (bucketLayout.isFileSystemOptimized()) { - recordActivity(jobKey, "Candidate Key Generation : " + - "Generating the ObjectId-Key Map for Directory Keys"); + recordActivity(jobKey, OBJECT_ID_MAP_GEN_FSO); getDeltaFilesAndDiffKeysToObjectIdToKeyMap(fsDirTable, tsDirTable, fromSnapshot, toSnapshot, fsInfo, tsInfo, useFullDiff, performNonNativeDiff, tablePrefixes, @@ -950,8 +950,7 @@ void generateSnapshotDiffReport(final String jobKey, return null; }, () -> { - recordActivity(jobKey, "Final Stage : " + - "Generating and storing the Diff Report"); + recordActivity(jobKey, DIFF_REPORT_GEN); long totalDiffEntries = generateDiffReport(jobId, fsKeyTable, tsKeyTable, @@ -1031,9 +1030,8 @@ private void getDeltaFilesAndDiffKeysToObjectIdToKeyMap( final String diffDir, final String jobKey) throws IOException, RocksDBException { List tablesToLookUp = Collections.singletonList(fsTable.getName()); - recordActivity(jobKey, "Computing Delta SST File Set"); Set deltaFiles = getDeltaFiles(fromSnapshot, toSnapshot, - tablesToLookUp, fsInfo, tsInfo, useFullDiff, tablePrefixes, diffDir); + tablesToLookUp, fsInfo, tsInfo, useFullDiff, tablePrefixes, diffDir, jobKey); // Workaround to handle deletes if native rocksDb tool for reading // tombstone is not loaded. @@ -1044,8 +1042,9 @@ private void getDeltaFilesAndDiffKeysToObjectIdToKeyMap( RocksDiffUtils.filterRelevantSstFiles(inputFiles, tablePrefixes, fromDB); deltaFiles.addAll(inputFiles); } - recordActivity(jobKey, "Computed Delta SST File Set, Total count = " + - deltaFiles.size()); + if (LOG.isDebugEnabled()) { + LOG.debug("Computed Delta SST File Set, Total count = {} ", deltaFiles.size()); + } addToObjectIdMap(fsTable, tsTable, deltaFiles, !skipNativeDiff && isNativeLibsLoaded, oldObjIdToKeyMap, newObjIdToKeyMap, objectIdToIsDirMap, oldParentIds, @@ -1071,9 +1070,12 @@ void addToObjectIdMap(Table fsTable, fsTable.getName().equals(DIRECTORY_TABLE); SstFileSetReader sstFileReader = new SstFileSetReader(deltaFiles); validateEstimatedKeyChangesAreInLimits(sstFileReader); + long totalEstimatedKeysToProcess = sstFileReader.getEstimatedTotalKeys(); String sstFileReaderLowerBound = tablePrefix; String sstFileReaderUpperBound = null; - int stepIncrease = 100; + double stepIncreasePct = 0.1; + double[] checkpoint = new double[1]; + checkpoint[0] = stepIncreasePct; if (Strings.isNotEmpty(tablePrefix)) { char[] upperBoundCharArray = tablePrefix.toCharArray(); upperBoundCharArray[upperBoundCharArray.length - 1] += 1; @@ -1084,9 +1086,12 @@ void addToObjectIdMap(Table fsTable, : sstFileReader.getKeyStream(sstFileReaderLowerBound, sstFileReaderUpperBound)) { AtomicLong keysProcessed = new AtomicLong(0); keysToCheck.forEach(key -> { - if (keysProcessed.get() % stepIncrease == 0) { - String status = "Completed processing " + keysProcessed + " number of keys"; - recordActivity(jobKey, status); + if (totalEstimatedKeysToProcess > 0) { + double progressPct = (double) keysProcessed.get() / totalEstimatedKeysToProcess; + if (progressPct >= checkpoint[0]) { + updateProgress(jobKey, progressPct); + checkpoint[0] += stepIncreasePct; + } } try { @@ -1141,7 +1146,7 @@ Set getDeltaFiles(OmSnapshot fromSnapshot, SnapshotInfo tsInfo, boolean useFullDiff, Map tablePrefixes, - String diffDir) + String diffDir, String jobKey) throws IOException { // TODO: [SNAPSHOT] Refactor the parameter list Optional> deltaFiles = Optional.empty(); @@ -1156,10 +1161,12 @@ Set getDeltaFiles(OmSnapshot fromSnapshot, final DifferSnapshotInfo toDSI = getDSIFromSI(tsInfo, toSnapshot, volume, bucket); + recordActivity(jobKey, SST_FILE_DELTA_DAG_WALK); LOG.debug("Calling RocksDBCheckpointDiffer"); try { deltaFiles = differ.getSSTDiffListWithFullPath(toDSI, fromDSI, diffDir).map(HashSet::new); } catch (Exception exception) { + recordActivity(jobKey, SST_FILE_DELTA_FULL_DIFF); LOG.warn("Failed to get SST diff file using RocksDBCheckpointDiffer. " + "It will fallback to full diff now.", exception); } @@ -1172,6 +1179,7 @@ Set getDeltaFiles(OmSnapshot fromSnapshot, LOG.warn("RocksDBCheckpointDiffer is not available, falling back to" + " slow path"); } + recordActivity(jobKey, SST_FILE_DELTA_FULL_DIFF); ManagedRocksDB fromDB = ((RDBStore)fromSnapshot.getMetadataManager().getStore()) .getDb().getManagedRocksDb(); ManagedRocksDB toDB = ((RDBStore)toSnapshot.getMetadataManager().getStore()) @@ -1520,16 +1528,23 @@ private synchronized void updateJobStatus(String jobKey, } private synchronized void recordActivity(String jobKey, - String statusText) { + SnapshotDiffResponse.SubStatus subStatus) { SnapshotDiffJob snapshotDiffJob = snapDiffJobTable.get(jobKey); - String timestamp = getTimeStampString() + ""; - snapshotDiffJob.addActivity(timestamp + " : " + statusText); + snapshotDiffJob.setSubStatus(subStatus); snapDiffJobTable.put(jobKey, snapshotDiffJob); + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshot Diff for jobKey = {} transitions to {} state", jobKey, subStatus); + } } - private static String getTimeStampString() { - Date date = new Date(System.currentTimeMillis()); - return date.toString(); + private synchronized void updateProgress(String jobKey, + double pct) { + SnapshotDiffJob snapshotDiffJob = snapDiffJobTable.get(jobKey); + snapshotDiffJob.setKeysProcessedPct(pct * 100); + snapDiffJobTable.put(jobKey, snapshotDiffJob); + if (LOG.isDebugEnabled()){ + LOG.debug("Completed processing {}% of keys for snapshot diff job {}", pct, jobKey); + } } private synchronized void updateJobStatusToFailed(String jobKey, diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java index dc980c3bf85b..5ad98a62754c 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java @@ -298,7 +298,7 @@ private SnapshotDiffJob addJobAndReport(JobStatus jobStatus, SnapshotDiffJob job = new SnapshotDiffJob(creationTime, jobId, jobStatus, volume, bucket, fromSnapshot, toSnapshot, false, false, noOfEntries, - new ArrayList<>()); + null, 0.0); db.get().put(jobTableCfh, codecRegistry.asRawData(jobKey), codecRegistry.asRawData(job)); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java index bf1c20b4b367..1a4ee6873999 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java @@ -304,7 +304,7 @@ public void init() throws RocksDBException, IOException, ExecutionException { SnapshotDiffJob diffJob = new SnapshotDiffJob(System.currentTimeMillis(), UUID.randomUUID().toString(), jobStatus, VOLUME_NAME, BUCKET_NAME, baseSnapshotName, targetSnapshotName, false, false, 0, - new ArrayList<>()); + null, 0.0); snapshotNames.add(targetSnapshotName); snapshotInfoList.add(targetSnapshot); @@ -423,6 +423,7 @@ public void testGetDeltaFilesWithDag(int numberOfFiles) throws IOException { .thenReturn(getSnapshotInfoInstance(VOLUME_NAME, BUCKET_NAME, snap2.toString(), snap2)); String diffDir = snapDiffDir.getAbsolutePath(); + String diffJobKey = snap1 + DELIMITER + snap2; Set randomStrings = IntStream.range(0, numberOfFiles) .mapToObj(i -> RandomStringUtils.randomAlphabetic(10)) .collect(Collectors.toSet()); @@ -454,7 +455,7 @@ public void testGetDeltaFilesWithDag(int numberOfFiles) throws IOException { toSnapshot, Arrays.asList("cf1", "cf2"), fromSnapshotInfo, toSnapshotInfo, false, - Collections.emptyMap(), diffDir); + Collections.emptyMap(), diffDir, diffJobKey); assertEquals(randomStrings, deltaFiles); } rcFromSnapshot.close(); @@ -497,6 +498,7 @@ public void testGetDeltaFilesWithFullDiff(int numberOfFiles, }); UUID snap1 = UUID.randomUUID(); UUID snap2 = UUID.randomUUID(); + String diffJobKey = snap1 + DELIMITER + snap2; when(snapshotInfoTable.get(SnapshotInfo.getTableKey(VOLUME_NAME, BUCKET_NAME, snap1.toString()))) .thenReturn(getSnapshotInfoInstance(VOLUME_NAME, BUCKET_NAME, snap1.toString(), snap2)); when(snapshotInfoTable.get(SnapshotInfo.getTableKey(VOLUME_NAME, BUCKET_NAME, snap2.toString()))) @@ -527,7 +529,7 @@ public void testGetDeltaFilesWithFullDiff(int numberOfFiles, toSnapshotInfo, false, Collections.emptyMap(), - snapDiffDir.getAbsolutePath()); + snapDiffDir.getAbsolutePath(),diffJobKey); assertEquals(deltaStrings, deltaFiles); } } @@ -587,6 +589,7 @@ public void testGetDeltaFilesWithDifferThrowException(int numberOfFiles) SnapshotInfo fromSnapshotInfo = getMockedSnapshotInfo(snap1); SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap1); when(jobTableIterator.isValid()).thenReturn(false); + String diffJobKey = snap1 + DELIMITER + snap2; Set deltaFiles = snapshotDiffManager.getDeltaFiles( fromSnapshot, toSnapshot, @@ -595,7 +598,7 @@ public void testGetDeltaFilesWithDifferThrowException(int numberOfFiles) toSnapshotInfo, false, Collections.emptyMap(), - snapDiffDir.getAbsolutePath()); + snapDiffDir.getAbsolutePath(), diffJobKey); assertEquals(deltaStrings, deltaFiles); rcFromSnapshot.close(); @@ -858,7 +861,7 @@ public void testGenerateDiffReport() throws IOException { assertEquals(100, totalDiffEntries); SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, "jobId", JobStatus.DONE, "vol", "buck", "fs", "ts", false, - true, diffMap.size(), new ArrayList<>()); + true, diffMap.size(), null, 0.0); SnapshotDiffReportOzone snapshotDiffReportOzone = snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol", "buck", "fs", "ts", @@ -919,11 +922,11 @@ public void testCreatePageResponse(int startIdx, SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, testJobId, JobStatus.DONE, "vol", "buck", "fs", "ts", false, - true, totalNumberOfRecords, new ArrayList<>()); + true, totalNumberOfRecords, null, 0.0); SnapshotDiffJob snapshotDiffJob2 = new SnapshotDiffJob(0, testJobId2, JobStatus.DONE, "vol", "buck", "fs", "ts", false, - true, totalNumberOfRecords, new ArrayList<>()); + true, totalNumberOfRecords, null, 0.0); db.get().put(snapDiffJobTable, codecRegistry.asRawData(testJobId), @@ -1068,7 +1071,7 @@ public void testSnapshotDiffCancelFailure(JobStatus jobStatus, String jobId = UUID.randomUUID().toString(); SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0L, jobId, jobStatus, volumeName, bucketName, - fromSnapshotName, toSnapshotName, true, false, 10, new ArrayList<>()); + fromSnapshotName, toSnapshotName, true, false, 10, null,0.0); snapDiffJobMap.put(diffJobKey, snapshotDiffJob); @@ -1525,8 +1528,10 @@ private void setupMocksForRunningASnapDiff( @Test public void testGetDeltaFilesWithFullDiff() throws IOException { SnapshotDiffManager spy = spy(snapshotDiffManager); - OmSnapshot fromSnapshot = getMockedOmSnapshot(UUID.randomUUID()); - OmSnapshot toSnapshot = getMockedOmSnapshot(UUID.randomUUID()); + UUID snap1 = UUID.randomUUID(); + OmSnapshot fromSnapshot = getMockedOmSnapshot(snap1); + UUID snap2 = UUID.randomUUID(); + OmSnapshot toSnapshot = getMockedOmSnapshot(snap2); Mockito.doAnswer(invocation -> { OmSnapshot snapshot = invocation.getArgument(0); if (snapshot == fromSnapshot) { @@ -1538,8 +1543,9 @@ public void testGetDeltaFilesWithFullDiff() throws IOException { return Sets.newHashSet("6", "7", "8"); }).when(spy).getSSTFileListForSnapshot(Mockito.any(OmSnapshot.class), Mockito.anyList()); + String diffJobKey = snap1 + DELIMITER + snap2; Set deltaFiles = spy.getDeltaFiles(fromSnapshot, toSnapshot, Collections.emptyList(), snapshotInfo, - snapshotInfo, true, Collections.emptyMap(), null); + snapshotInfo, true, Collections.emptyMap(), null, diffJobKey); Assertions.assertEquals(Sets.newHashSet("1", "2", "3", "4", "5"), deltaFiles); } @@ -1554,7 +1560,7 @@ public void testGetSnapshotDiffReportHappyCase() throws Exception { doReturn(testDeltaFiles).when(spy).getDeltaFiles(any(OmSnapshot.class), any(OmSnapshot.class), anyList(), eq(fromSnapInfo), eq(toSnapInfo), - eq(false), anyMap(), anyString()); + eq(false), anyMap(), anyString(),anyString()); doReturn(testDeltaFiles).when(spy) .getSSTFileListForSnapshot(any(OmSnapshot.class), anyList()); From 16eb6389ad62edb79ef95690d05fc84d36e1aa41 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Mon, 10 Mar 2025 19:28:22 +0530 Subject: [PATCH 3/6] fix checkstyle --- .../hadoop/ozone/om/helpers/SnapshotDiffJob.java | 6 +++--- .../ozone/snapshot/SnapshotDiffResponse.java | 4 +++- .../ozone/om/snapshot/SnapshotDiffManager.java | 15 +++++++++------ .../om/snapshot/TestSnapshotDiffManager.java | 16 ++++++++-------- 4 files changed, 23 insertions(+), 18 deletions(-) diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java index 44856e757620..f31ca5b6b2ba 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java @@ -60,7 +60,7 @@ public static Codec getCodec() { // percentage of keys processed in the Object-ID generation phase. // This is the most time-consuming phase as it loads both snapshots // and reads from it populating the ObjectID-key map. - double keysProcessedPct; + private double keysProcessedPct; // Default constructor for Jackson Serializer. public SnapshotDiffJob() { @@ -217,8 +217,8 @@ public String toString() { } if (status.equals(JobStatus.IN_PROGRESS) && subStatus != null) { sb.append(", subStatus: ").append(status); - if (subStatus.equals(SubStatus.OBJECT_ID_MAP_GEN_FSO) - || subStatus.equals(SubStatus.OBJECT_ID_MAP_GEN_OBS)){ + if (subStatus.equals(SubStatus.OBJECT_ID_MAP_GEN_FSO) || + subStatus.equals(SubStatus.OBJECT_ID_MAP_GEN_OBS)) { sb.append(", keysProcessedPercent"); } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java index 4bf2ce03c599..9b35f4c5e858 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java @@ -45,7 +45,9 @@ public static JobStatus fromProtobuf(JobStatusProto jobStatusProto) { return JobStatus.valueOf(jobStatusProto.name()); } } - + /** + * Snapshot diff job sub-status enum. + */ public enum SubStatus { SST_FILE_DELTA_DAG_WALK, SST_FILE_DELTA_FULL_DIFF, diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java index 1e911b6577de..c6c2a197b30e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java @@ -55,7 +55,11 @@ import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN_PROGRESS; import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.QUEUED; import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.REJECTED; -import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.*; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.DIFF_REPORT_GEN; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.OBJECT_ID_MAP_GEN_FSO; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.OBJECT_ID_MAP_GEN_OBS; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.SST_FILE_DELTA_DAG_WALK; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.SST_FILE_DELTA_FULL_DIFF; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; @@ -71,7 +75,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -80,12 +83,12 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.stream.Stream; import org.apache.commons.io.file.PathUtils; @@ -755,8 +758,8 @@ private synchronized SnapshotDiffJob getSnapDiffReportStatus( if (snapDiffJob == null) { String jobId = UUID.randomUUID().toString(); snapDiffJob = new SnapshotDiffJob(System.currentTimeMillis(), jobId, - QUEUED, volumeName, bucketName, fromSnapshotName, toSnapshotName, - forceFullDiff, disableNativeDiff, 0L, null,0.0); + QUEUED, volumeName, bucketName, fromSnapshotName, toSnapshotName, forceFullDiff, + disableNativeDiff, 0L, null, 0.0); snapDiffJobTable.put(jobKey, snapDiffJob); } @@ -1542,7 +1545,7 @@ private synchronized void updateProgress(String jobKey, SnapshotDiffJob snapshotDiffJob = snapDiffJobTable.get(jobKey); snapshotDiffJob.setKeysProcessedPct(pct * 100); snapDiffJobTable.put(jobKey, snapshotDiffJob); - if (LOG.isDebugEnabled()){ + if (LOG.isDebugEnabled()) { LOG.debug("Completed processing {}% of keys for snapshot diff job {}", pct, jobKey); } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java index 1a4ee6873999..235b8918052f 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java @@ -529,7 +529,7 @@ public void testGetDeltaFilesWithFullDiff(int numberOfFiles, toSnapshotInfo, false, Collections.emptyMap(), - snapDiffDir.getAbsolutePath(),diffJobKey); + snapDiffDir.getAbsolutePath(), diffJobKey); assertEquals(deltaStrings, deltaFiles); } } @@ -708,9 +708,8 @@ public void testObjectIdMapWithTombstoneEntries(boolean nativeLibraryLoaded, nativeLibraryLoaded, oldObjectIdKeyMap, newObjectIdKeyMap, objectIdsToCheck, Optional.of(oldParentIds), Optional.of(newParentIds), - ImmutableMap.of(OmMetadataManagerImpl.DIRECTORY_TABLE, "", - OmMetadataManagerImpl.KEY_TABLE, "", - OmMetadataManagerImpl.FILE_TABLE, ""),""); + ImmutableMap.of(OmMetadataManagerImpl.DIRECTORY_TABLE, "", OmMetadataManagerImpl.KEY_TABLE, "", + OmMetadataManagerImpl.FILE_TABLE, ""), ""); try (ClosableIterator> oldObjectIdIter = oldObjectIdKeyMap.iterator()) { @@ -1070,8 +1069,8 @@ public void testSnapshotDiffCancelFailure(JobStatus jobStatus, String jobId = UUID.randomUUID().toString(); SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0L, - jobId, jobStatus, volumeName, bucketName, - fromSnapshotName, toSnapshotName, true, false, 10, null,0.0); + jobId, jobStatus, volumeName, bucketName, fromSnapshotName, toSnapshotName, + true, false, 10, null, 0.0); snapDiffJobMap.put(diffJobKey, snapshotDiffJob); @@ -1560,13 +1559,14 @@ public void testGetSnapshotDiffReportHappyCase() throws Exception { doReturn(testDeltaFiles).when(spy).getDeltaFiles(any(OmSnapshot.class), any(OmSnapshot.class), anyList(), eq(fromSnapInfo), eq(toSnapInfo), - eq(false), anyMap(), anyString(),anyString()); + eq(false), anyMap(), anyString(), + anyString()); doReturn(testDeltaFiles).when(spy) .getSSTFileListForSnapshot(any(OmSnapshot.class), anyList()); doNothing().when(spy).addToObjectIdMap(eq(keyInfoTable), eq(keyInfoTable), - any(), anyBoolean(), any(), any(), any(), any(), any(), anyMap(),anyString()); + any(), anyBoolean(), any(), any(), any(), any(), any(), anyMap(), anyString()); doNothing().when(spy).checkReportsIntegrity(any(), anyInt(), anyInt()); doReturn(10L).when(spy).generateDiffReport(anyString(), From 3bdb7910f6de596be3b4666cb9e375213afe1dd3 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Mon, 10 Mar 2025 21:36:15 +0530 Subject: [PATCH 4/6] fix test --- .../ozone/om/snapshot/SnapshotDiffManager.java | 4 ++-- .../om/snapshot/TestSnapshotDiffManager.java | 18 +++++++++++++++--- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java index c6c2a197b30e..17199debbd47 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java @@ -1530,7 +1530,7 @@ private synchronized void updateJobStatus(String jobKey, snapDiffJobTable.put(jobKey, snapshotDiffJob); } - private synchronized void recordActivity(String jobKey, + synchronized void recordActivity(String jobKey, SnapshotDiffResponse.SubStatus subStatus) { SnapshotDiffJob snapshotDiffJob = snapDiffJobTable.get(jobKey); snapshotDiffJob.setSubStatus(subStatus); @@ -1540,7 +1540,7 @@ private synchronized void recordActivity(String jobKey, } } - private synchronized void updateProgress(String jobKey, + synchronized void updateProgress(String jobKey, double pct) { SnapshotDiffJob snapshotDiffJob = snapDiffJobTable.get(jobKey); snapshotDiffJob.setKeysProcessedPct(pct * 100); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java index 235b8918052f..793d87296f15 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java @@ -56,6 +56,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.anyDouble; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyList; import static org.mockito.Mockito.anyMap; @@ -450,7 +451,10 @@ public void testGetDeltaFilesWithDag(int numberOfFiles) throws IOException { mockedRdbUtil.when(() -> RdbUtil.getSSTFilesForComparison(any(), any())) .thenReturn(Collections.singleton(RandomStringUtils.randomAlphabetic(10))); mockedRocksDiffUtils.when(() -> RocksDiffUtils.filterRelevantSstFiles(any(), any())).thenAnswer(i -> null); - Set deltaFiles = snapshotDiffManager.getDeltaFiles( + SnapshotDiffManager spy = spy(snapshotDiffManager); + doNothing().when(spy).recordActivity(any(), any()); + doNothing().when(spy).updateProgress(anyString(), anyDouble()); + Set deltaFiles = spy.getDeltaFiles( fromSnapshot, toSnapshot, Arrays.asList("cf1", "cf2"), fromSnapshotInfo, @@ -521,7 +525,10 @@ public void testGetDeltaFilesWithFullDiff(int numberOfFiles, SnapshotInfo fromSnapshotInfo = getMockedSnapshotInfo(snap1); SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap1); when(jobTableIterator.isValid()).thenReturn(false); - Set deltaFiles = snapshotDiffManager.getDeltaFiles( + SnapshotDiffManager spy = spy(snapshotDiffManager); + doNothing().when(spy).recordActivity(any(), any()); + doNothing().when(spy).updateProgress(anyString(), anyDouble()); + Set deltaFiles = spy.getDeltaFiles( fromSnapshot, toSnapshot, Arrays.asList("cf1", "cf2"), @@ -590,7 +597,10 @@ public void testGetDeltaFilesWithDifferThrowException(int numberOfFiles) SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap1); when(jobTableIterator.isValid()).thenReturn(false); String diffJobKey = snap1 + DELIMITER + snap2; - Set deltaFiles = snapshotDiffManager.getDeltaFiles( + SnapshotDiffManager spy = spy(snapshotDiffManager); + doNothing().when(spy).recordActivity(any(), any()); + doNothing().when(spy).updateProgress(anyString(), anyDouble()); + Set deltaFiles = spy.getDeltaFiles( fromSnapshot, toSnapshot, Arrays.asList("cf1", "cf2"), @@ -1542,6 +1552,8 @@ public void testGetDeltaFilesWithFullDiff() throws IOException { return Sets.newHashSet("6", "7", "8"); }).when(spy).getSSTFileListForSnapshot(Mockito.any(OmSnapshot.class), Mockito.anyList()); + doNothing().when(spy).recordActivity(any(), any()); + doNothing().when(spy).updateProgress(anyString(), anyDouble()); String diffJobKey = snap1 + DELIMITER + snap2; Set deltaFiles = spy.getDeltaFiles(fromSnapshot, toSnapshot, Collections.emptyList(), snapshotInfo, snapshotInfo, true, Collections.emptyMap(), null, diffJobKey); From 3856a46aa9be32feb798d323587186b6afaeddb8 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Tue, 11 Mar 2025 18:31:08 +0530 Subject: [PATCH 5/6] fix toString --- .../org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java | 2 +- .../org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java index f31ca5b6b2ba..5e45d26e05c6 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java @@ -219,7 +219,7 @@ public String toString() { sb.append(", subStatus: ").append(status); if (subStatus.equals(SubStatus.OBJECT_ID_MAP_GEN_FSO) || subStatus.equals(SubStatus.OBJECT_ID_MAP_GEN_OBS)) { - sb.append(", keysProcessedPercent"); + sb.append(", keysProcessedPercent: ").append(keysProcessedPct); } } return sb.toString(); diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java index 9b35f4c5e858..ece5e72a94ff 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java @@ -142,7 +142,7 @@ public String toString() { .append(waitTimeInMs) .append(" ms.\n"); if (subStatus != null) { - str.append("SubStatus is ") + str.append("SubStatus : ") .append(subStatus); if (subStatus.equals(SubStatus.OBJECT_ID_MAP_GEN_OBS) || subStatus.equals(SubStatus.OBJECT_ID_MAP_GEN_FSO)) { From 6f6cbb5d726b0eb08db46d2db1b5b55367efae7c Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Thu, 27 Mar 2025 18:51:02 +0530 Subject: [PATCH 6/6] Update hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java Co-authored-by: Aryan Gupta <44232823+aryangupta1998@users.noreply.github.com> --- .../org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java index 5e45d26e05c6..30db0815aa9f 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java @@ -219,7 +219,7 @@ public String toString() { sb.append(", subStatus: ").append(status); if (subStatus.equals(SubStatus.OBJECT_ID_MAP_GEN_FSO) || subStatus.equals(SubStatus.OBJECT_ID_MAP_GEN_OBS)) { - sb.append(", keysProcessedPercent: ").append(keysProcessedPct); + sb.append(String.format(", keysProcessedPercent: %.2f", keysProcessedPct)); } } return sb.toString();