Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.FileIOUtils;
Expand Down Expand Up @@ -399,7 +400,7 @@ private Stream<HoodieInstant> getCleanInstantsToArchive() {
}).flatMap(Collection::stream);
}

private Stream<HoodieInstant> getCommitInstantsToArchive() {
private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException {
// TODO (na) : Add a way to return actions associated with a timeline and then merge/unify
// with logic above to avoid Stream.concat
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
Expand Down Expand Up @@ -432,6 +433,11 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() {
table.getActiveTimeline(), config.getInlineCompactDeltaCommitMax())
: Option.empty();

// The clustering commit instant can not be archived unless we ensure that the replaced files have been cleaned,
// without the replaced files metadata on the timeline, the fs view would expose duplicates for readers.
Option<HoodieInstant> oldestInstantToRetainForClustering =
ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient());

// Actually do the commits
Stream<HoodieInstant> instantToArchiveStream = commitTimeline.getInstantsAsStream()
.filter(s -> {
Expand All @@ -444,7 +450,7 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() {
return !(firstSavepoint.isPresent() && compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
}
}).filter(s -> {
// Ensure commits >= oldest pending compaction commit is retained
// Ensure commits >= the oldest pending compaction commit is retained
return oldestPendingCompactionAndReplaceInstant
.map(instant -> compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.orElse(true);
Expand All @@ -461,14 +467,18 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() {
oldestInstantToRetainForCompaction.map(instantToRetain ->
compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
.orElse(true)
).filter(s ->
oldestInstantToRetainForClustering.map(instantToRetain ->
HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
.orElse(true)
);
return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep);
} else {
return Stream.empty();
}
}

private Stream<HoodieInstant> getInstantsToArchive() {
private Stream<HoodieInstant> getInstantsToArchive() throws IOException {
Stream<HoodieInstant> instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
if (config.isMetastoreEnabled()) {
return Stream.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,24 @@ public Option<HoodieInstant> getEarliestCommitToRetain() {
int hoursRetained = config.getCleanerHoursRetained();
if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
&& commitTimeline.countInstants() > commitsRetained) {
earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list
Option<HoodieInstant> earliestPendingCommits = hoodieTable.getMetaClient()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor. singular for "earliestPendingCommits"

.getActiveTimeline()
.getCommitsTimeline()
.filter(s -> !s.isCompleted()).firstInstant();
if (earliestPendingCommits.isPresent()) {
// Earliest commit to retain must not be later than the earliest pending commit
earliestCommitToRetain =
commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained).map(nthInstant -> {
if (nthInstant.compareTo(earliestPendingCommits.get()) <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the timestamp compare apis in HoodieTimeline

HoodieTimeline.compareTimestamps(commit1Ts, GREATER_THAN, commit2Ts)

return Option.of(nthInstant);
} else {
return commitTimeline.findInstantsBefore(earliestPendingCommits.get().getTimestamp()).lastInstant();
}
}).orElse(Option.empty());
} else {
earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants()
- commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list
}
} else if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
Instant instant = Instant.now();
ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
Expand Down Expand Up @@ -224,4 +225,40 @@ public static List<HoodieInstant> getPendingClusteringInstantTimes(HoodieTableMe
public static boolean isPendingClusteringInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) {
return getClusteringPlan(metaClient, instant).isPresent();
}

/**
* Checks whether the latest clustering instant has a subsequent cleaning action. Returns
* the clustering instant if there is such cleaning action or empty.
*
* @param activeTimeline The active timeline
* @param metaClient The meta client
* @return the oldest instant to retain for clustering
*/
public static Option<HoodieInstant> getOldestInstantToRetainForClustering(
HoodieActiveTimeline activeTimeline, HoodieTableMetaClient metaClient) throws IOException {
HoodieTimeline replaceTimeline = activeTimeline.getCompletedReplaceTimeline();
if (!replaceTimeline.empty()) {
Option<HoodieInstant> cleanInstantOpt =
activeTimeline.getCleanerTimeline().filter(instant -> !instant.isCompleted()).firstInstant();
if (cleanInstantOpt.isPresent()) {
// The first clustering instant of which timestamp is greater than or equal to the earliest commit to retain of
// the clean metadata.
HoodieInstant cleanInstant = cleanInstantOpt.get();
String earliestCommitToRetain =
CleanerUtils.getCleanerPlan(metaClient,
cleanInstant.isRequested()
? cleanInstant
: HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp()))
.getEarliestInstantToRetain().getTimestamp();
return StringUtils.isNullOrEmpty(earliestCommitToRetain)
? Option.empty()
: replaceTimeline.filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.GREATER_THAN_OR_EQUALS,
earliestCommitToRetain))
.firstInstant();
}
}
return Option.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@

package org.apache.hudi.common.util;

import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;

import org.junit.jupiter.api.BeforeEach;
Expand All @@ -44,6 +48,7 @@
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

/**
* Tests for {@link ClusteringUtils}.
Expand Down Expand Up @@ -115,6 +120,70 @@ public void testClusteringPlanInflight() throws Exception {
assertEquals(requestedClusteringPlan, inflightClusteringPlan);
}

@Test
public void testGetOldestInstantToRetainForClustering() throws IOException {
String partitionPath1 = "partition1";
List<String> fileIds1 = new ArrayList<>();
fileIds1.add(UUID.randomUUID().toString());
String clusterTime1 = "1";
HoodieInstant requestedInstant1 = createRequestedReplaceInstant(partitionPath1, clusterTime1, fileIds1);
HoodieInstant inflightInstant1 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant1, Option.empty());
HoodieInstant completedInstant1 = metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant1, Option.empty());
List<String> fileIds2 = new ArrayList<>();
fileIds2.add(UUID.randomUUID().toString());
fileIds2.add(UUID.randomUUID().toString());
String clusterTime2 = "2";
HoodieInstant requestedInstant2 = createRequestedReplaceInstant(partitionPath1, clusterTime2, fileIds2);
HoodieInstant inflightInstant2 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant2, Option.empty());
metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant2, Option.empty());
List<String> fileIds3 = new ArrayList<>();
fileIds3.add(UUID.randomUUID().toString());
fileIds3.add(UUID.randomUUID().toString());
fileIds3.add(UUID.randomUUID().toString());
String clusterTime3 = "3";
HoodieInstant requestedInstant3 = createRequestedReplaceInstant(partitionPath1, clusterTime3, fileIds3);
HoodieInstant inflightInstant3 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant3, Option.empty());
HoodieInstant completedInstant3 = metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant3, Option.empty());
metaClient.reloadActiveTimeline();
Option<HoodieInstant> actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
assertFalse(actual.isPresent());
// test first uncompleted clean instant is requested.
String cleanTime1 = "4";
HoodieInstant requestedInstant4 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime1);
HoodieCleanerPlan cleanerPlan1 = HoodieCleanerPlan.newBuilder()
.setEarliestInstantToRetainBuilder(HoodieActionInstant.newBuilder()
.setAction(completedInstant1.getAction())
.setTimestamp(completedInstant1.getTimestamp())
.setState(completedInstant1.getState().name()))
.setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
.setFilesToBeDeletedPerPartition(new HashMap<>())
.setVersion(CleanPlanV2MigrationHandler.VERSION)
.build();
metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant4, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan1));
metaClient.reloadActiveTimeline();
actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
assertEquals(clusterTime1, actual.get().getTimestamp());
HoodieInstant inflightInstant4 = metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant4, Option.empty());
metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant4, Option.empty());
// test first uncompleted clean instant is inflight.
String cleanTime2 = "5";
HoodieInstant requestedInstant5 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime2);
HoodieCleanerPlan cleanerPlan2 = HoodieCleanerPlan.newBuilder()
.setEarliestInstantToRetainBuilder(HoodieActionInstant.newBuilder()
.setAction(completedInstant3.getAction())
.setTimestamp(completedInstant3.getTimestamp())
.setState(completedInstant3.getState().name()))
.setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
.setFilesToBeDeletedPerPartition(new HashMap<>())
.setVersion(CleanPlanV2MigrationHandler.VERSION)
.build();
metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant5, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan2));
metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant5, Option.empty());
metaClient.reloadActiveTimeline();
actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
assertEquals(clusterTime3, actual.get().getTimestamp());
}

private void validateClusteringInstant(List<String> fileIds, String partitionPath,
String expectedInstantTime, Map<HoodieFileGroupId, HoodieInstant> fileGroupToInstantMap) {
for (String fileId : fileIds) {
Expand Down