diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 396b47ae0a30c..4aa8df4ab2eb0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -188,8 +188,12 @@ private List getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain() + ". New Instant to retain : " + newInstantToRetain); return hoodieTable.getCompletedCommitsTimeline().getInstantsAsStream().filter( - instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, - cleanMetadata.getEarliestCommitToRetain()) && HoodieTimeline.compareTimestamps(instant.getTimestamp(), + instant -> (HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, + cleanMetadata.getEarliestCommitToRetain()) + || (instant.getMarkerFileAccessTimestamp().isPresent() + && (HoodieTimeline.compareTimestamps(instant.getMarkerFileAccessTimestamp().get(), HoodieTimeline.GREATER_THAN_OR_EQUALS, + cleanMetadata.getStartCleanTime())))) + && HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())).flatMap(instant -> { try { if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java index 0115742e07a08..5a2081c1a7f85 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java @@ -18,6 +18,8 @@ package org.apache.hudi.common.table.timeline; +import org.apache.hudi.common.util.Option; + import org.apache.hadoop.fs.FileStatus; import java.io.Serializable; @@ -73,6 +75,7 @@ public enum State { private State state = State.COMPLETED; private String action; private String timestamp; + private Option markerFileAccessTimestamp; /** * Load the instant from the meta FileStatus. @@ -82,6 +85,7 @@ public HoodieInstant(FileStatus fileStatus) { String fileName = fileStatus.getPath().getName(); String fileExtension = getTimelineFileExtension(fileName); timestamp = fileName.replace(fileExtension, ""); + markerFileAccessTimestamp = Option.ofNullable(HoodieInstantTimeGenerator.parseTimeMillisToInstantTime(fileStatus.getAccessTime())); // Next read the action for this marker action = fileExtension.replaceFirst(".", ""); @@ -104,12 +108,14 @@ public HoodieInstant(boolean isInflight, String action, String timestamp) { this.state = isInflight ? State.INFLIGHT : State.COMPLETED; this.action = action; this.timestamp = timestamp; + this.markerFileAccessTimestamp = Option.ofNullable(null); } public HoodieInstant(State state, String action, String timestamp) { this.state = state; this.action = action; this.timestamp = timestamp; + this.markerFileAccessTimestamp = Option.ofNullable(null); } public boolean isCompleted() { @@ -132,6 +138,10 @@ public String getTimestamp() { return timestamp; } + public Option getMarkerFileAccessTimestamp() { + return markerFileAccessTimestamp; + } + /** * Get the filename for this instant. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java index e839e73669e90..03d068b190958 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java @@ -81,6 +81,15 @@ public static String createNewInstantTime(long milliseconds) { }); } + public static String parseTimeMillisToInstantTime(long milliseconds) { + try { + Date d = new Date(milliseconds); + return MILLIS_INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d)); + } catch (DateTimeParseException e) { + throw e; + } + } + public static Date parseDateFromInstantTime(String timestamp) throws ParseException { try { // Enables backwards compatibility with non-millisecond granularity instants