Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.common.util;

import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieClusteringStrategy;
Expand Down Expand Up @@ -228,10 +229,8 @@ public static boolean isPendingClusteringInstant(HoodieTableMetaClient metaClien
}

/**
* Returns the oldest instant to retain between the clustering instant that there is such cleaning action or empty,
* and the latest instant before the oldest inflight clustering instant.
*
* <p>Checks whether the latest clustering instant has a subsequent cleaning action, and the oldest inflight clustering instant has a previous commit.
* Returns the oldest instant to retain.
* Make sure the clustering instant won't be archived before cleaned, and the oldest inflight clustering instant has a previous commit.
*
* @param activeTimeline The active timeline
* @param metaClient The meta client
Expand All @@ -243,23 +242,34 @@ public static Option<HoodieInstant> getOldestInstantToRetainForClustering(
HoodieTimeline replaceTimeline = activeTimeline.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.REPLACE_COMMIT_ACTION));
if (!replaceTimeline.empty()) {
Option<HoodieInstant> cleanInstantOpt =
activeTimeline.getCleanerTimeline().filter(instant -> !instant.isCompleted()).firstInstant();
activeTimeline.getCleanerTimeline().filterCompletedInstants().lastInstant();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, using completed clean instants is more reliable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good, cc @bvaradar for another round of review~

if (cleanInstantOpt.isPresent()) {
// The first clustering instant of which timestamp is greater than or equal to the earliest commit to retain of
// the clean metadata.
HoodieInstant cleanInstant = cleanInstantOpt.get();
String earliestCommitToRetain =
CleanerUtils.getCleanerPlan(metaClient,
cleanInstant.isRequested()
? cleanInstant
: HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp()))
.getEarliestInstantToRetain().getTimestamp();
if (!StringUtils.isNullOrEmpty(earliestCommitToRetain)) {
oldestInstantToRetain = replaceTimeline.filterCompletedInstants()
.filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, earliestCommitToRetain))
.firstInstant();
HoodieActionInstant earliestInstantToRetain = CleanerUtils.getCleanerPlan(metaClient, cleanInstant.isRequested()
? cleanInstant
: HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp()))
.getEarliestInstantToRetain();
String retainLowerBound;
if (earliestInstantToRetain != null && !StringUtils.isNullOrEmpty(earliestInstantToRetain.getTimestamp())) {
retainLowerBound = earliestInstantToRetain.getTimestamp();
} else {
// no earliestInstantToRetain, indicate KEEP_LATEST_FILE_VERSIONS clean policy,
// retain first instant after clean instant
retainLowerBound = cleanInstant.getTimestamp();
}

oldestInstantToRetain = replaceTimeline.filter(instant ->
HoodieTimeline.compareTimestamps(
instant.getTimestamp(),
HoodieTimeline.GREATER_THAN_OR_EQUALS,
retainLowerBound))
.firstInstant();
} else {
oldestInstantToRetain = replaceTimeline.firstInstant();
}

Option<HoodieInstant> pendingInstantOpt = replaceTimeline.filterInflights().firstInstant();
if (pendingInstantOpt.isPresent()) {
// Get the previous commit before the first inflight clustering instant.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.common.util;

import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
Expand Down Expand Up @@ -48,7 +49,7 @@
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Tests for {@link ClusteringUtils}.
Expand Down Expand Up @@ -128,7 +129,7 @@ public void testGetOldestInstantToRetainForClustering() throws IOException {
String clusterTime1 = "1";
HoodieInstant requestedInstant1 = createRequestedReplaceInstant(partitionPath1, clusterTime1, fileIds1);
HoodieInstant inflightInstant1 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant1, Option.empty());
HoodieInstant completedInstant1 = metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant1, Option.empty());
metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant1, Option.empty());
List<String> fileIds2 = new ArrayList<>();
fileIds2.add(UUID.randomUUID().toString());
fileIds2.add(UUID.randomUUID().toString());
Expand All @@ -146,29 +147,12 @@ public void testGetOldestInstantToRetainForClustering() throws IOException {
HoodieInstant completedInstant3 = metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant3, Option.empty());
metaClient.reloadActiveTimeline();
Option<HoodieInstant> actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
assertFalse(actual.isPresent());
// test first uncompleted clean instant is requested.
assertTrue(actual.isPresent());
assertEquals(clusterTime1, actual.get().getTimestamp(), "no clean in timeline, retain first replace commit");

String cleanTime1 = "4";
HoodieInstant requestedInstant4 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime1);
HoodieCleanerPlan cleanerPlan1 = HoodieCleanerPlan.newBuilder()
.setEarliestInstantToRetainBuilder(HoodieActionInstant.newBuilder()
.setAction(completedInstant1.getAction())
.setTimestamp(completedInstant1.getTimestamp())
.setState(completedInstant1.getState().name()))
.setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
.setFilesToBeDeletedPerPartition(new HashMap<>())
.setVersion(CleanPlanV2MigrationHandler.VERSION)
.build();
metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant4, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan1));
metaClient.reloadActiveTimeline();
actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
assertEquals(clusterTime1, actual.get().getTimestamp());
HoodieInstant inflightInstant4 = metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant4, Option.empty());
metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant4, Option.empty());
// test first uncompleted clean instant is inflight.
String cleanTime2 = "5";
HoodieInstant requestedInstant5 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime2);
HoodieCleanerPlan cleanerPlan2 = HoodieCleanerPlan.newBuilder()
.setEarliestInstantToRetainBuilder(HoodieActionInstant.newBuilder()
.setAction(completedInstant3.getAction())
.setTimestamp(completedInstant3.getTimestamp())
Expand All @@ -177,11 +161,54 @@ public void testGetOldestInstantToRetainForClustering() throws IOException {
.setFilesToBeDeletedPerPartition(new HashMap<>())
.setVersion(CleanPlanV2MigrationHandler.VERSION)
.build();
metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant5, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan2));
metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant5, Option.empty());
metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant4, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan1));
HoodieInstant inflightInstant4 = metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant4, Option.empty());
HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(cleanTime1, 1L, 1,
completedInstant3.getTimestamp(), "", Collections.emptyMap(), 0, Collections.emptyMap());
metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant4,
TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
metaClient.reloadActiveTimeline();
actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
assertEquals(clusterTime3, actual.get().getTimestamp());
assertEquals(clusterTime3, actual.get().getTimestamp(),
"retain the first replace commit after the earliestInstantToRetain ");
}

/** test getOldestInstantToRetainForClustering with KEEP_LATEST_FILE_VERSIONS as clean policy */
@Test
public void testGetOldestInstantToRetainForClusteringKeepFileVersion() throws IOException {
String partitionPath1 = "partition1";
List<String> fileIds1 = new ArrayList<>();
fileIds1.add(UUID.randomUUID().toString());
String clusterTime1 = "1";
HoodieInstant requestedInstant1 = createRequestedReplaceInstant(partitionPath1, clusterTime1, fileIds1);
HoodieInstant inflightInstant1 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant1, Option.empty());
metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant1, Option.empty());

String cleanTime1 = "2";
HoodieInstant requestedInstant2 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime1);
HoodieCleanerPlan cleanerPlan1 = new HoodieCleanerPlan(null, clusterTime1,
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name(), Collections.emptyMap(),
CleanPlanV2MigrationHandler.VERSION, Collections.emptyMap(), Collections.emptyList());
metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant2, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan1));
HoodieInstant inflightInstant2 = metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant2, Option.empty());
HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(cleanTime1, 1L, 1,
"", "", Collections.emptyMap(), 0, Collections.emptyMap());
metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant2,
TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
metaClient.reloadActiveTimeline();

List<String> fileIds2 = new ArrayList<>();
fileIds2.add(UUID.randomUUID().toString());
fileIds2.add(UUID.randomUUID().toString());
String clusterTime2 = "3";
HoodieInstant requestedInstant3 = createRequestedReplaceInstant(partitionPath1, clusterTime2, fileIds2);
HoodieInstant inflightInstant3 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant3, Option.empty());
metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant3, Option.empty());
metaClient.reloadActiveTimeline();

Option<HoodieInstant> actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
assertEquals(clusterTime2, actual.get().getTimestamp(),
"retain the first replace commit after the last complete clean ");
}

private void validateClusteringInstant(List<String> fileIds, String partitionPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1111,9 +1111,9 @@ public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean, HoodieR
ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();

// Step 5 : Make sure that firstReplaceHoodieInstant is archived.
// Step 5 : FirstReplaceHoodieInstant is retained for clean.
long count = meta.reloadActiveTimeline().getCompletedReplaceTimeline().getInstantsAsStream().filter(instant -> firstReplaceHoodieInstant.get().equals(instant)).count();
assertEquals(0, count);
assertEquals(1, count);

// Step 6 : All the replaced files in firstReplaceHoodieInstant should be deleted through sync/async cleaner.
for (String replacedFilePath : replacedFilePaths) {
Expand Down