diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 3de4f796e9b94..af59918fce6d4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.util;
+import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieClusteringStrategy;
@@ -228,10 +229,8 @@ public static boolean isPendingClusteringInstant(HoodieTableMetaClient metaClien
}
/**
- * Returns the oldest instant to retain between the clustering instant that there is such cleaning action or empty,
- * and the latest instant before the oldest inflight clustering instant.
- *
- *
Checks whether the latest clustering instant has a subsequent cleaning action, and the oldest inflight clustering instant has a previous commit.
+ * Returns the oldest instant to retain.
+ * Make sure the clustering instant won't be archived before cleaned, and the oldest inflight clustering instant has a previous commit.
*
* @param activeTimeline The active timeline
* @param metaClient The meta client
@@ -243,23 +242,34 @@ public static Option getOldestInstantToRetainForClustering(
HoodieTimeline replaceTimeline = activeTimeline.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.REPLACE_COMMIT_ACTION));
if (!replaceTimeline.empty()) {
Option cleanInstantOpt =
- activeTimeline.getCleanerTimeline().filter(instant -> !instant.isCompleted()).firstInstant();
+ activeTimeline.getCleanerTimeline().filterCompletedInstants().lastInstant();
if (cleanInstantOpt.isPresent()) {
// The first clustering instant of which timestamp is greater than or equal to the earliest commit to retain of
// the clean metadata.
HoodieInstant cleanInstant = cleanInstantOpt.get();
- String earliestCommitToRetain =
- CleanerUtils.getCleanerPlan(metaClient,
- cleanInstant.isRequested()
- ? cleanInstant
- : HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp()))
- .getEarliestInstantToRetain().getTimestamp();
- if (!StringUtils.isNullOrEmpty(earliestCommitToRetain)) {
- oldestInstantToRetain = replaceTimeline.filterCompletedInstants()
- .filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, earliestCommitToRetain))
- .firstInstant();
+ HoodieActionInstant earliestInstantToRetain = CleanerUtils.getCleanerPlan(metaClient, cleanInstant.isRequested()
+ ? cleanInstant
+ : HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp()))
+ .getEarliestInstantToRetain();
+ String retainLowerBound;
+ if (earliestInstantToRetain != null && !StringUtils.isNullOrEmpty(earliestInstantToRetain.getTimestamp())) {
+ retainLowerBound = earliestInstantToRetain.getTimestamp();
+ } else {
+ // no earliestInstantToRetain, indicate KEEP_LATEST_FILE_VERSIONS clean policy,
+ // retain first instant after clean instant
+ retainLowerBound = cleanInstant.getTimestamp();
}
+
+ oldestInstantToRetain = replaceTimeline.filter(instant ->
+ HoodieTimeline.compareTimestamps(
+ instant.getTimestamp(),
+ HoodieTimeline.GREATER_THAN_OR_EQUALS,
+ retainLowerBound))
+ .firstInstant();
+ } else {
+ oldestInstantToRetain = replaceTimeline.firstInstant();
}
+
Option pendingInstantOpt = replaceTimeline.filterInflights().firstInstant();
if (pendingInstantOpt.isPresent()) {
// Get the previous commit before the first inflight clustering instant.
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index 5235183d10fb2..87b31531786e7 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.util;
import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
@@ -48,7 +49,7 @@
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Tests for {@link ClusteringUtils}.
@@ -128,7 +129,7 @@ public void testGetOldestInstantToRetainForClustering() throws IOException {
String clusterTime1 = "1";
HoodieInstant requestedInstant1 = createRequestedReplaceInstant(partitionPath1, clusterTime1, fileIds1);
HoodieInstant inflightInstant1 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant1, Option.empty());
- HoodieInstant completedInstant1 = metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant1, Option.empty());
+ metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant1, Option.empty());
List fileIds2 = new ArrayList<>();
fileIds2.add(UUID.randomUUID().toString());
fileIds2.add(UUID.randomUUID().toString());
@@ -146,29 +147,12 @@ public void testGetOldestInstantToRetainForClustering() throws IOException {
HoodieInstant completedInstant3 = metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant3, Option.empty());
metaClient.reloadActiveTimeline();
Option actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
- assertFalse(actual.isPresent());
- // test first uncompleted clean instant is requested.
+ assertTrue(actual.isPresent());
+ assertEquals(clusterTime1, actual.get().getTimestamp(), "no clean in timeline, retain first replace commit");
+
String cleanTime1 = "4";
HoodieInstant requestedInstant4 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime1);
HoodieCleanerPlan cleanerPlan1 = HoodieCleanerPlan.newBuilder()
- .setEarliestInstantToRetainBuilder(HoodieActionInstant.newBuilder()
- .setAction(completedInstant1.getAction())
- .setTimestamp(completedInstant1.getTimestamp())
- .setState(completedInstant1.getState().name()))
- .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
- .setFilesToBeDeletedPerPartition(new HashMap<>())
- .setVersion(CleanPlanV2MigrationHandler.VERSION)
- .build();
- metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant4, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan1));
- metaClient.reloadActiveTimeline();
- actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
- assertEquals(clusterTime1, actual.get().getTimestamp());
- HoodieInstant inflightInstant4 = metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant4, Option.empty());
- metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant4, Option.empty());
- // test first uncompleted clean instant is inflight.
- String cleanTime2 = "5";
- HoodieInstant requestedInstant5 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime2);
- HoodieCleanerPlan cleanerPlan2 = HoodieCleanerPlan.newBuilder()
.setEarliestInstantToRetainBuilder(HoodieActionInstant.newBuilder()
.setAction(completedInstant3.getAction())
.setTimestamp(completedInstant3.getTimestamp())
@@ -177,11 +161,54 @@ public void testGetOldestInstantToRetainForClustering() throws IOException {
.setFilesToBeDeletedPerPartition(new HashMap<>())
.setVersion(CleanPlanV2MigrationHandler.VERSION)
.build();
- metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant5, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan2));
- metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant5, Option.empty());
+ metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant4, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan1));
+ HoodieInstant inflightInstant4 = metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant4, Option.empty());
+ HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(cleanTime1, 1L, 1,
+ completedInstant3.getTimestamp(), "", Collections.emptyMap(), 0, Collections.emptyMap());
+ metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant4,
+ TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
metaClient.reloadActiveTimeline();
actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
- assertEquals(clusterTime3, actual.get().getTimestamp());
+ assertEquals(clusterTime3, actual.get().getTimestamp(),
+ "retain the first replace commit after the earliestInstantToRetain ");
+ }
+
+ /** test getOldestInstantToRetainForClustering with KEEP_LATEST_FILE_VERSIONS as clean policy */
+ @Test
+ public void testGetOldestInstantToRetainForClusteringKeepFileVersion() throws IOException {
+ String partitionPath1 = "partition1";
+ List fileIds1 = new ArrayList<>();
+ fileIds1.add(UUID.randomUUID().toString());
+ String clusterTime1 = "1";
+ HoodieInstant requestedInstant1 = createRequestedReplaceInstant(partitionPath1, clusterTime1, fileIds1);
+ HoodieInstant inflightInstant1 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant1, Option.empty());
+ metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant1, Option.empty());
+
+ String cleanTime1 = "2";
+ HoodieInstant requestedInstant2 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime1);
+ HoodieCleanerPlan cleanerPlan1 = new HoodieCleanerPlan(null, clusterTime1,
+ HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name(), Collections.emptyMap(),
+ CleanPlanV2MigrationHandler.VERSION, Collections.emptyMap(), Collections.emptyList());
+ metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant2, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan1));
+ HoodieInstant inflightInstant2 = metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant2, Option.empty());
+ HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(cleanTime1, 1L, 1,
+ "", "", Collections.emptyMap(), 0, Collections.emptyMap());
+ metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant2,
+ TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
+ metaClient.reloadActiveTimeline();
+
+ List fileIds2 = new ArrayList<>();
+ fileIds2.add(UUID.randomUUID().toString());
+ fileIds2.add(UUID.randomUUID().toString());
+ String clusterTime2 = "3";
+ HoodieInstant requestedInstant3 = createRequestedReplaceInstant(partitionPath1, clusterTime2, fileIds2);
+ HoodieInstant inflightInstant3 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant3, Option.empty());
+ metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant3, Option.empty());
+ metaClient.reloadActiveTimeline();
+
+ Option actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
+ assertEquals(clusterTime2, actual.get().getTimestamp(),
+ "retain the first replace commit after the last complete clean ");
}
private void validateClusteringInstant(List fileIds, String partitionPath,
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index fe2e170299340..43b5d46bbc07e 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1111,9 +1111,9 @@ public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean, HoodieR
ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
- // Step 5 : Make sure that firstReplaceHoodieInstant is archived.
+ // Step 5 : FirstReplaceHoodieInstant is retained for clean.
long count = meta.reloadActiveTimeline().getCompletedReplaceTimeline().getInstantsAsStream().filter(instant -> firstReplaceHoodieInstant.get().equals(instant)).count();
- assertEquals(0, count);
+ assertEquals(1, count);
// Step 6 : All the replaced files in firstReplaceHoodieInstant should be deleted through sync/async cleaner.
for (String replacedFilePath : replacedFilePaths) {