Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.FileIOUtils;
Expand Down Expand Up @@ -396,7 +397,7 @@ private Stream<HoodieInstant> getCleanInstantsToArchive() {
}).flatMap(Collection::stream);
}

private Stream<HoodieInstant> getCommitInstantsToArchive() {
private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException {
// TODO (na) : Add a way to return actions associated with a timeline and then merge/unify
// with logic above to avoid Stream.concat
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
Expand Down Expand Up @@ -429,6 +430,8 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() {
table.getActiveTimeline(), config.getInlineCompactDeltaCommitMax())
: Option.empty();

Option<HoodieInstant> earliestUnCleanCompletedInstant = CleanerUtils.getEarliestUnCleanCompletedInstant(metaClient);

// Actually do the commits
Stream<HoodieInstant> instantToArchiveStream = commitTimeline.getInstantsAsStream()
.filter(s -> {
Expand All @@ -441,7 +444,7 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() {
return !(firstSavepoint.isPresent() && compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
}
}).filter(s -> {
// Ensure commits >= oldest pending compaction commit is retained
// Ensure commits >= the oldest pending compaction commit is retained
return oldestPendingCompactionAndReplaceInstant
.map(instant -> compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.orElse(true);
Expand All @@ -455,6 +458,10 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() {
}
return true;
}).filter(s ->
earliestUnCleanCompletedInstant.map(instant ->
compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.orElse(true)
).filter(s ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We better check the timeline clean metadata files, only when all the replaced clustering files are cleaned, can the clustering instant be archived.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We better check the timeline clean metadata files, only when all the replaced clustering files are cleaned, can the clustering instant be archived.

no completed clustering instant -> no need to check
firstCompletedClusteringInstant is not null && no completed clean instant -> return firstCompletedClusteringInstant
firstCompletedClusteringInstant is not null && earliestCleanInstant is not null -> return firstUncleanClusteringInstant

oldestInstantToRetainForCompaction.map(instantToRetain ->
compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
.orElse(true)
Expand All @@ -465,7 +472,7 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() {
}
}

private Stream<HoodieInstant> getInstantsToArchive() {
private Stream<HoodieInstant> getInstantsToArchive() throws IOException {
Stream<HoodieInstant> instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
if (config.isMetastoreEnabled()) {
return Stream.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,12 @@ private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ ". New Instant to retain : " + newInstantToRetain);
return hoodieTable.getCompletedCommitsTimeline().getInstantsAsStream().filter(
instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS,
cleanMetadata.getEarliestCommitToRetain()) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
instant -> (HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS,
cleanMetadata.getEarliestCommitToRetain())
|| (instant.getMarkerFileModificationTimestamp().isPresent()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't think the condition is needed for the commits which isn't the replacecommit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guarantee compatibility. Maybe other new kinds of instant need to clean later ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an out-of-order replace commit finished before the clean start and the instant time of the replace commit is before the earliest commit to retain, it won't be cleaned and left in the timeline. Archiver will then archive it since it's last modified time is earlier than the last clean in the timeline. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an out-of-order replace commit finished before the clean start and the instant time of the replace commit is before the earliest commit to retain, it won't be cleaned and left in the timeline. Archiver will then archive it since it's last modified time is earlier than the last clean in the timeline. What do you think?

You are right,it still won't clean the clustering instant in this scenario.

&& (HoodieTimeline.compareTimestamps(instant.getMarkerFileModificationTimestamp().get(), HoodieTimeline.GREATER_THAN_OR_EQUALS,
cleanMetadata.getStartCleanTime()))))
&& HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
try {
if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.common.table.timeline;

import org.apache.hudi.common.util.Option;

import org.apache.hadoop.fs.FileStatus;

import java.io.Serializable;
Expand Down Expand Up @@ -73,6 +75,7 @@ public enum State {
private State state = State.COMPLETED;
private String action;
private String timestamp;
private Option<String> markerFileModificationTimestamp = Option.empty();

/**
* Load the instant from the meta FileStatus.
Expand All @@ -82,6 +85,7 @@ public HoodieInstant(FileStatus fileStatus) {
String fileName = fileStatus.getPath().getName();
String fileExtension = getTimelineFileExtension(fileName);
timestamp = fileName.replace(fileExtension, "");
markerFileModificationTimestamp = Option.ofNullable(HoodieInstantTimeGenerator.parseTimeMillisToInstantTime(fileStatus.getModificationTime()));

// Next read the action for this marker
action = fileExtension.replaceFirst(".", "");
Expand Down Expand Up @@ -132,6 +136,10 @@ public String getTimestamp() {
return timestamp;
}

public Option<String> getMarkerFileModificationTimestamp() {
return markerFileModificationTimestamp;
}

/**
* Get the filename for this instant.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ public static String createNewInstantTime(long milliseconds) {
});
}

public static String parseTimeMillisToInstantTime(long milliseconds) {
try {
Date d = new Date(milliseconds);
return MILLIS_INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d));
} catch (DateTimeParseException e) {
throw e;
}
}

public static Date parseDateFromInstantTime(String timestamp) throws ParseException {
try {
// Enables backwards compatibility with non-millisecond granularity instants
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
Expand All @@ -48,7 +50,7 @@
/**
* Utils for clean action.
*/
public class CleanerUtils {
public class CleanerUtils<T extends HoodieAvroPayload, I, K, O> {

private static final Logger LOG = LogManager.getLogger(CleanerUtils.class);

Expand Down Expand Up @@ -161,4 +163,42 @@ public static void rollbackFailedWrites(HoodieFailedWritesCleaningPolicy cleanin
throw new IllegalArgumentException("Unsupported action type " + actionType);
}
}

/**
* Get latest completed clean planner clean time.
* @param metaClient
* @return Latest clean planner clean time.
* @throws IOException
*/
public static Option<String> getLatestCompletedCleanInstantCleanTime(HoodieTableMetaClient metaClient)
throws IOException {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
Option<HoodieInstant> lastCleanInstantOption = activeTimeline.getCleanerTimeline().filterCompletedInstants().lastInstant();
Option<HoodieCleanMetadata> cleanMetadata = lastCleanInstantOption.isPresent() && ! activeTimeline.isEmpty(lastCleanInstantOption.get())
? Option.ofNullable(TimelineMetadataUtils.deserializeHoodieCleanMetadata(activeTimeline.getInstantDetails(lastCleanInstantOption.get()).get()))
: Option.empty();
return cleanMetadata.isPresent() ? Option.ofNullable(cleanMetadata.get().getStartCleanTime()) : Option.empty();
}

/**
* Get earliest unClean completed instant
* @param metaClient
* @return
* @throws IOException
*/
public static Option<HoodieInstant> getEarliestUnCleanCompletedInstant(HoodieTableMetaClient metaClient) throws IOException {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
Option<HoodieInstant> firstCompletedClusteringInstant = activeTimeline.getCommitsTimeline().filterCompletedInstants()
.filter(hoodieInstant -> hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).firstInstant();
if (!firstCompletedClusteringInstant.isPresent()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the firstCompletedClusteringInstant isn't present, it's no need to get the latest clean time from the clean metadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move it behind else.

return Option.empty();
} else {
Option<String> earliestInstantCleanTime = getLatestCompletedCleanInstantCleanTime(metaClient);
return ! earliestInstantCleanTime.isPresent()
? firstCompletedClusteringInstant
: activeTimeline.getCommitsTimeline().filterCompletedInstants().filter(instant ->
instant.getMarkerFileModificationTimestamp().isPresent()
&& HoodieTimeline.compareTimestamps(instant.getMarkerFileModificationTimestamp().get(), HoodieTimeline.GREATER_THAN_OR_EQUALS, earliestInstantCleanTime.get())).firstInstant();
}
}
}