From e52b94809103b6fc2bb39dedaf9f88dff133bf6f Mon Sep 17 00:00:00 2001 From: zhuanshenbsj1 Date: Wed, 7 Dec 2022 13:37:58 +0800 Subject: [PATCH 1/7] IncrementalCleaning consider later clustering --- .../org/apache/hudi/table/action/clean/CleanPlanner.java | 8 ++++++-- .../apache/hudi/common/table/timeline/HoodieInstant.java | 9 +++++++++ .../table/timeline/HoodieInstantTimeGenerator.java | 9 +++++++++ 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 396b47ae0a30c..4aa8df4ab2eb0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -188,8 +188,12 @@ private List getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain() + ". New Instant to retain : " + newInstantToRetain); return hoodieTable.getCompletedCommitsTimeline().getInstantsAsStream().filter( - instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, - cleanMetadata.getEarliestCommitToRetain()) && HoodieTimeline.compareTimestamps(instant.getTimestamp(), + instant -> (HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, + cleanMetadata.getEarliestCommitToRetain()) + || (instant.getMarkerFileAccessTimestamp().isPresent() + && (HoodieTimeline.compareTimestamps(instant.getMarkerFileAccessTimestamp().get(), HoodieTimeline.GREATER_THAN_OR_EQUALS, + cleanMetadata.getStartCleanTime())))) + && HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())).flatMap(instant -> { try { if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java index 0115742e07a08..fd7e704c09d7b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.table.timeline; import org.apache.hadoop.fs.FileStatus; +import org.apache.hudi.common.util.Option; import java.io.Serializable; import java.util.Comparator; @@ -73,6 +74,7 @@ public enum State { private State state = State.COMPLETED; private String action; private String timestamp; + private Option markerFileAccessTimestamp ; /** * Load the instant from the meta FileStatus. @@ -82,6 +84,7 @@ public HoodieInstant(FileStatus fileStatus) { String fileName = fileStatus.getPath().getName(); String fileExtension = getTimelineFileExtension(fileName); timestamp = fileName.replace(fileExtension, ""); + markerFileAccessTimestamp = Option.ofNullable(HoodieInstantTimeGenerator.parseTimeMillisToInstantTime(fileStatus.getAccessTime())); // Next read the action for this marker action = fileExtension.replaceFirst(".", ""); @@ -104,12 +107,14 @@ public HoodieInstant(boolean isInflight, String action, String timestamp) { this.state = isInflight ? State.INFLIGHT : State.COMPLETED; this.action = action; this.timestamp = timestamp; + this.markerFileAccessTimestamp = Option.ofNullable(null); } public HoodieInstant(State state, String action, String timestamp) { this.state = state; this.action = action; this.timestamp = timestamp; + this.markerFileAccessTimestamp = Option.ofNullable(null); } public boolean isCompleted() { @@ -132,6 +137,10 @@ public String getTimestamp() { return timestamp; } + public Option getMarkerFileAccessTimestamp(){ + return markerFileAccessTimestamp; + } + /** * Get the filename for this instant. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java index e839e73669e90..abc2992ec3e1a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java @@ -81,6 +81,15 @@ public static String createNewInstantTime(long milliseconds) { }); } + public static String parseTimeMillisToInstantTime(long milliseconds){ + try { + Date d = new Date(milliseconds); + return MILLIS_INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d)); + } catch (DateTimeParseException e) { + throw e; + } + } + public static Date parseDateFromInstantTime(String timestamp) throws ParseException { try { // Enables backwards compatibility with non-millisecond granularity instants From ff0887aff8930a1813f4863563e12d8b3db1b20e Mon Sep 17 00:00:00 2001 From: zhuanshenbsj1 Date: Wed, 7 Dec 2022 23:57:12 +0800 Subject: [PATCH 2/7] fix checkstyle error --- .../apache/hudi/common/table/timeline/HoodieInstant.java | 7 ++++--- .../common/table/timeline/HoodieInstantTimeGenerator.java | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java index fd7e704c09d7b..5a2081c1a7f85 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java @@ -18,9 +18,10 @@ package org.apache.hudi.common.table.timeline; -import org.apache.hadoop.fs.FileStatus; import org.apache.hudi.common.util.Option; +import org.apache.hadoop.fs.FileStatus; + import java.io.Serializable; import java.util.Comparator; import java.util.HashMap; @@ -74,7 +75,7 @@ public enum State { private State state = State.COMPLETED; private String action; private String timestamp; - private Option markerFileAccessTimestamp ; + private Option markerFileAccessTimestamp; /** * Load the instant from the meta FileStatus. @@ -137,7 +138,7 @@ public String getTimestamp() { return timestamp; } - public Option getMarkerFileAccessTimestamp(){ + public Option getMarkerFileAccessTimestamp() { return markerFileAccessTimestamp; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java index abc2992ec3e1a..03d068b190958 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java @@ -81,7 +81,7 @@ public static String createNewInstantTime(long milliseconds) { }); } - public static String parseTimeMillisToInstantTime(long milliseconds){ + public static String parseTimeMillisToInstantTime(long milliseconds) { try { Date d = new Date(milliseconds); return MILLIS_INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d)); From ff02d94ca1052398e61770c74900337f2369959e Mon Sep 17 00:00:00 2001 From: zhuanshenbsj1 Date: Fri, 16 Dec 2022 01:09:23 +0800 Subject: [PATCH 3/7] add archive commit for clean to clustering --- .../apache/hudi/client/HoodieTimelineArchiver.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index a61a5c9008293..d4212c4196a0c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -44,6 +44,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.FileIOUtils; @@ -396,7 +397,7 @@ private Stream getCleanInstantsToArchive() { }).flatMap(Collection::stream); } - private Stream getCommitInstantsToArchive() { + private Stream getCommitInstantsToArchive() throws IOException { // TODO (na) : Add a way to return actions associated with a timeline and then merge/unify // with logic above to avoid Stream.concat HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); @@ -429,6 +430,8 @@ private Stream getCommitInstantsToArchive() { table.getActiveTimeline(), config.getInlineCompactDeltaCommitMax()) : Option.empty(); + Option earlestUnCleanCompletedInstant = CleanerUtils.getEarliestUnCleanCompletedInstant(metaClient); + // Actually do the commits Stream instantToArchiveStream = commitTimeline.getInstantsAsStream() .filter(s -> { @@ -455,6 +458,10 @@ private Stream getCommitInstantsToArchive() { } return true; }).filter(s -> + earlestUnCleanCompletedInstant.map(instant -> + compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) + .orElse(true) + ).filter(s -> oldestInstantToRetainForCompaction.map(instantToRetain -> compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp())) .orElse(true) @@ -465,7 +472,7 @@ private Stream getCommitInstantsToArchive() { } } - private Stream getInstantsToArchive() { + private Stream getInstantsToArchive() throws IOException { Stream instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive()); if (config.isMetastoreEnabled()) { return Stream.empty(); From b7a1071aa728627a1457eafb26f323b3e1b44e5b Mon Sep 17 00:00:00 2001 From: zhuanshenbsj1 Date: Fri, 16 Dec 2022 01:10:22 +0800 Subject: [PATCH 4/7] add archive commit for clean to clustering +1 --- .../apache/hudi/common/util/CleanerUtils.java | 46 ++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java index 704dacc6073d8..2dc58e0e9d5e8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java @@ -24,8 +24,10 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.model.CleanFileInfo; +import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; @@ -48,7 +50,7 @@ /** * Utils for clean action. */ -public class CleanerUtils { +public class CleanerUtils { private static final Logger LOG = LogManager.getLogger(CleanerUtils.class); @@ -161,4 +163,46 @@ public static void rollbackFailedWrites(HoodieFailedWritesCleaningPolicy cleanin throw new IllegalArgumentException("Unsupported action type " + actionType); } } + + /** + * Get latest clean planner clean time. + * @param metaClient + * @return Latest clean planner clean time. + * @throws IOException + */ + public static Option getLatestInstantCleanTime(HoodieTableMetaClient metaClient) + throws IOException { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + Option lastCleanInstantOption = activeTimeline.getCleanerTimeline().lastInstant(); + Option cleanMetadata = lastCleanInstantOption.isPresent() && ! activeTimeline.isEmpty(lastCleanInstantOption.get()) + ? Option.ofNullable(TimelineMetadataUtils.deserializeHoodieCleanMetadata(activeTimeline.getInstantDetails(lastCleanInstantOption.get()).get())) + : Option.empty(); + return cleanMetadata.isPresent() ? Option.ofNullable(cleanMetadata.get().getStartCleanTime()) : Option.empty(); + } + + /** + * Get earliest unClean completed instant + * @param metaClient + * @return + * @throws IOException + */ + public static Option getEarliestUnCleanCompletedInstant(HoodieTableMetaClient metaClient) throws IOException { + Option earliestInstantCleanTime = getLatestInstantCleanTime(metaClient); + if (earliestInstantCleanTime.isPresent()) { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + return activeTimeline.getCommitsTimeline().filterCompletedInstants().filter(instant -> + instant.getMarkerFileAccessTimestamp().isPresent() + && HoodieTimeline.compareTimestamps(instant.getMarkerFileAccessTimestamp().get(), HoodieTimeline.GREATER_THAN_OR_EQUALS, earliestInstantCleanTime.get())).firstInstant(); + } else { + // if no clean instant, return first completed clustering instant + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + Option firstCompletedClusteringInstant = activeTimeline.getCommitsTimeline().filterCompletedInstants() + .filter(hoodieInstant -> hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).firstInstant(); + if (firstCompletedClusteringInstant.isPresent()) { + return firstCompletedClusteringInstant; + } else { + return Option.empty(); + } + } + } } From 1fb15d7c2ed329ede870d965ca1ba9d25a551233 Mon Sep 17 00:00:00 2001 From: zhuanshenbsj1 Date: Tue, 27 Dec 2022 01:17:54 +0800 Subject: [PATCH 5/7] change accessTime to modificationTime --- .../hudi/table/action/clean/CleanPlanner.java | 4 +-- .../common/table/timeline/HoodieInstant.java | 12 ++++----- .../apache/hudi/common/util/CleanerUtils.java | 26 +++++++++---------- 3 files changed, 20 insertions(+), 22 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 4aa8df4ab2eb0..ef3cc8ea8ea8f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -190,8 +190,8 @@ private List getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata return hoodieTable.getCompletedCommitsTimeline().getInstantsAsStream().filter( instant -> (HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, cleanMetadata.getEarliestCommitToRetain()) - || (instant.getMarkerFileAccessTimestamp().isPresent() - && (HoodieTimeline.compareTimestamps(instant.getMarkerFileAccessTimestamp().get(), HoodieTimeline.GREATER_THAN_OR_EQUALS, + || (instant.getMarkerFileModificationTimestamp().isPresent() + && (HoodieTimeline.compareTimestamps(instant.getMarkerFileModificationTimestamp().get(), HoodieTimeline.GREATER_THAN_OR_EQUALS, cleanMetadata.getStartCleanTime())))) && HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())).flatMap(instant -> { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java index 5a2081c1a7f85..83b21115ddadb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java @@ -75,7 +75,7 @@ public enum State { private State state = State.COMPLETED; private String action; private String timestamp; - private Option markerFileAccessTimestamp; + private Option markerFileModificationTimestamp; /** * Load the instant from the meta FileStatus. @@ -85,7 +85,7 @@ public HoodieInstant(FileStatus fileStatus) { String fileName = fileStatus.getPath().getName(); String fileExtension = getTimelineFileExtension(fileName); timestamp = fileName.replace(fileExtension, ""); - markerFileAccessTimestamp = Option.ofNullable(HoodieInstantTimeGenerator.parseTimeMillisToInstantTime(fileStatus.getAccessTime())); + markerFileModificationTimestamp = Option.ofNullable(HoodieInstantTimeGenerator.parseTimeMillisToInstantTime(fileStatus.getModificationTime())); // Next read the action for this marker action = fileExtension.replaceFirst(".", ""); @@ -108,14 +108,14 @@ public HoodieInstant(boolean isInflight, String action, String timestamp) { this.state = isInflight ? State.INFLIGHT : State.COMPLETED; this.action = action; this.timestamp = timestamp; - this.markerFileAccessTimestamp = Option.ofNullable(null); + this.markerFileModificationTimestamp = Option.ofNullable(null); } public HoodieInstant(State state, String action, String timestamp) { this.state = state; this.action = action; this.timestamp = timestamp; - this.markerFileAccessTimestamp = Option.ofNullable(null); + this.markerFileModificationTimestamp = Option.ofNullable(null); } public boolean isCompleted() { @@ -138,8 +138,8 @@ public String getTimestamp() { return timestamp; } - public Option getMarkerFileAccessTimestamp() { - return markerFileAccessTimestamp; + public Option getMarkerFileModificationTimestamp() { + return markerFileModificationTimestamp; } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java index 2dc58e0e9d5e8..b95ec9ab5a6a6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java @@ -188,21 +188,19 @@ public static Option getLatestInstantCleanTime(HoodieTableMetaClient met */ public static Option getEarliestUnCleanCompletedInstant(HoodieTableMetaClient metaClient) throws IOException { Option earliestInstantCleanTime = getLatestInstantCleanTime(metaClient); - if (earliestInstantCleanTime.isPresent()) { - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - return activeTimeline.getCommitsTimeline().filterCompletedInstants().filter(instant -> - instant.getMarkerFileAccessTimestamp().isPresent() - && HoodieTimeline.compareTimestamps(instant.getMarkerFileAccessTimestamp().get(), HoodieTimeline.GREATER_THAN_OR_EQUALS, earliestInstantCleanTime.get())).firstInstant(); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + Option firstCompletedClusteringInstant = activeTimeline.getCommitsTimeline().filterCompletedInstants() + .filter(hoodieInstant -> hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).firstInstant(); + Option lastCompletedClusteringInstant = activeTimeline.getCommitsTimeline().filterCompletedInstants() + .filter(hoodieInstant -> hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).lastInstant(); + if (!lastCompletedClusteringInstant.isPresent()) { + return Option.empty(); } else { - // if no clean instant, return first completed clustering instant - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - Option firstCompletedClusteringInstant = activeTimeline.getCommitsTimeline().filterCompletedInstants() - .filter(hoodieInstant -> hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).firstInstant(); - if (firstCompletedClusteringInstant.isPresent()) { - return firstCompletedClusteringInstant; - } else { - return Option.empty(); - } + return ! earliestInstantCleanTime.isPresent() + ? firstCompletedClusteringInstant + : activeTimeline.getCommitsTimeline().filterCompletedInstants().filter(instant -> + instant.getMarkerFileModificationTimestamp().isPresent() + && HoodieTimeline.compareTimestamps(instant.getMarkerFileModificationTimestamp().get(), HoodieTimeline.GREATER_THAN_OR_EQUALS, earliestInstantCleanTime.get())).firstInstant(); } } } From 677991282ce2a2e034dfdce41a8aecfb456e6c0c Mon Sep 17 00:00:00 2001 From: zhuanshenbsj1 Date: Tue, 27 Dec 2022 01:30:19 +0800 Subject: [PATCH 6/7] no need to get lastCompletedClusteringInstant --- .../main/java/org/apache/hudi/common/util/CleanerUtils.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java index b95ec9ab5a6a6..f8447af580282 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java @@ -191,9 +191,7 @@ public static Option getEarliestUnCleanCompletedInstant(HoodieTab HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); Option firstCompletedClusteringInstant = activeTimeline.getCommitsTimeline().filterCompletedInstants() .filter(hoodieInstant -> hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).firstInstant(); - Option lastCompletedClusteringInstant = activeTimeline.getCommitsTimeline().filterCompletedInstants() - .filter(hoodieInstant -> hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).lastInstant(); - if (!lastCompletedClusteringInstant.isPresent()) { + if (!firstCompletedClusteringInstant.isPresent()) { return Option.empty(); } else { return ! earliestInstantCleanTime.isPresent() From cea266de563494230bdbeb46a348a2b5026f10b5 Mon Sep 17 00:00:00 2001 From: zhuanshenbsj1 Date: Tue, 27 Dec 2022 16:07:43 +0800 Subject: [PATCH 7/7] Adjust getCleantime: cleanInstant -> completed cleanInstant --- .../org/apache/hudi/client/HoodieTimelineArchiver.java | 6 +++--- .../apache/hudi/common/table/timeline/HoodieInstant.java | 4 +--- .../java/org/apache/hudi/common/util/CleanerUtils.java | 8 ++++---- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index d4212c4196a0c..360514a0e289e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -430,7 +430,7 @@ private Stream getCommitInstantsToArchive() throws IOException { table.getActiveTimeline(), config.getInlineCompactDeltaCommitMax()) : Option.empty(); - Option earlestUnCleanCompletedInstant = CleanerUtils.getEarliestUnCleanCompletedInstant(metaClient); + Option earliestUnCleanCompletedInstant = CleanerUtils.getEarliestUnCleanCompletedInstant(metaClient); // Actually do the commits Stream instantToArchiveStream = commitTimeline.getInstantsAsStream() @@ -444,7 +444,7 @@ private Stream getCommitInstantsToArchive() throws IOException { return !(firstSavepoint.isPresent() && compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp())); } }).filter(s -> { - // Ensure commits >= oldest pending compaction commit is retained + // Ensure commits >= the oldest pending compaction commit is retained return oldestPendingCompactionAndReplaceInstant .map(instant -> compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) .orElse(true); @@ -458,7 +458,7 @@ private Stream getCommitInstantsToArchive() throws IOException { } return true; }).filter(s -> - earlestUnCleanCompletedInstant.map(instant -> + earliestUnCleanCompletedInstant.map(instant -> compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) .orElse(true) ).filter(s -> diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java index 83b21115ddadb..9c13e90063add 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java @@ -75,7 +75,7 @@ public enum State { private State state = State.COMPLETED; private String action; private String timestamp; - private Option markerFileModificationTimestamp; + private Option markerFileModificationTimestamp = Option.empty(); /** * Load the instant from the meta FileStatus. @@ -108,14 +108,12 @@ public HoodieInstant(boolean isInflight, String action, String timestamp) { this.state = isInflight ? State.INFLIGHT : State.COMPLETED; this.action = action; this.timestamp = timestamp; - this.markerFileModificationTimestamp = Option.ofNullable(null); } public HoodieInstant(State state, String action, String timestamp) { this.state = state; this.action = action; this.timestamp = timestamp; - this.markerFileModificationTimestamp = Option.ofNullable(null); } public boolean isCompleted() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java index f8447af580282..1f900cd7ea48f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java @@ -165,15 +165,15 @@ public static void rollbackFailedWrites(HoodieFailedWritesCleaningPolicy cleanin } /** - * Get latest clean planner clean time. + * Get latest completed clean planner clean time. * @param metaClient * @return Latest clean planner clean time. * @throws IOException */ - public static Option getLatestInstantCleanTime(HoodieTableMetaClient metaClient) + public static Option getLatestCompletedCleanInstantCleanTime(HoodieTableMetaClient metaClient) throws IOException { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - Option lastCleanInstantOption = activeTimeline.getCleanerTimeline().lastInstant(); + Option lastCleanInstantOption = activeTimeline.getCleanerTimeline().filterCompletedInstants().lastInstant(); Option cleanMetadata = lastCleanInstantOption.isPresent() && ! activeTimeline.isEmpty(lastCleanInstantOption.get()) ? Option.ofNullable(TimelineMetadataUtils.deserializeHoodieCleanMetadata(activeTimeline.getInstantDetails(lastCleanInstantOption.get()).get())) : Option.empty(); @@ -187,13 +187,13 @@ public static Option getLatestInstantCleanTime(HoodieTableMetaClient met * @throws IOException */ public static Option getEarliestUnCleanCompletedInstant(HoodieTableMetaClient metaClient) throws IOException { - Option earliestInstantCleanTime = getLatestInstantCleanTime(metaClient); HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); Option firstCompletedClusteringInstant = activeTimeline.getCommitsTimeline().filterCompletedInstants() .filter(hoodieInstant -> hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).firstInstant(); if (!firstCompletedClusteringInstant.isPresent()) { return Option.empty(); } else { + Option earliestInstantCleanTime = getLatestCompletedCleanInstantCleanTime(metaClient); return ! earliestInstantCleanTime.isPresent() ? firstCompletedClusteringInstant : activeTimeline.getCommitsTimeline().filterCompletedInstants().filter(instant ->