Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ private HoodieTableFileSystemView buildFileSystemView(String globRegex, String m
}

HoodieTimeline filteredTimeline = new HoodieDefaultTimeline(instantsStream,
(Function<HoodieInstant, Option<byte[]>> & Serializable) metaClient.getActiveTimeline()::getInstantDetails);
(Function<HoodieInstant, Option<byte[]>> & Serializable) metaClient.getActiveTimeline()::getInstantDetails, timeline.getLastUpdateTime());
return new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses.toArray(new FileStatus[0]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ private boolean deleteAllInstantsOlderOrEqualsInAuxMetaFolder(HoodieInstant thre
metaClient.scanHoodieInstantsFromFileSystem(
new Path(metaClient.getMetaAuxiliaryPath()),
HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE,
false);
false).getKey();
} catch (FileNotFoundException e) {
/*
* On some FSs deletion of all files in the directory can auto remove the directory itself.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;

Expand All @@ -55,6 +56,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -489,8 +491,8 @@ public String getCommitActionType() {
* @return List of Hoodie Instants generated
* @throws IOException in case of failure
*/
public List<HoodieInstant> scanHoodieInstantsFromFileSystem(Set<String> includedExtensions,
boolean applyLayoutVersionFilters) throws IOException {
public Pair<List<HoodieInstant>, Long> scanHoodieInstantsFromFileSystem(Set<String> includedExtensions,
boolean applyLayoutVersionFilters) throws IOException {
return scanHoodieInstantsFromFileSystem(new Path(metaPath), includedExtensions, applyLayoutVersionFilters);
}

Expand All @@ -504,20 +506,22 @@ public List<HoodieInstant> scanHoodieInstantsFromFileSystem(Set<String> included
* @return List of Hoodie Instants generated
* @throws IOException in case of failure
*/
public List<HoodieInstant> scanHoodieInstantsFromFileSystem(Path timelinePath, Set<String> includedExtensions,
public Pair<List<HoodieInstant>, Long> scanHoodieInstantsFromFileSystem(Path timelinePath, Set<String> includedExtensions,
boolean applyLayoutVersionFilters) throws IOException {
Stream<HoodieInstant> instantStream = Arrays.stream(
HoodieTableMetaClient
.scanFiles(getFs(), timelinePath, path -> {
// Include only the meta files with extensions that needs to be included
String extension = HoodieInstant.getTimelineFileExtension(path.getName());
return includedExtensions.contains(extension);
})).map(HoodieInstant::new);

FileStatus[] instantsFileStatus = HoodieTableMetaClient
.scanFiles(getFs(), timelinePath, path -> {
// Include only the meta files with extensions that needs to be included
String extension = HoodieInstant.getTimelineFileExtension(path.getName());
return includedExtensions.contains(extension);
});
OptionalLong lastUpdatedTime = Arrays.stream(instantsFileStatus).mapToLong(FileStatus::getModificationTime).max();
Stream<HoodieInstant> instantStream = Arrays.stream(instantsFileStatus).map(HoodieInstant::new);

if (applyLayoutVersionFilters) {
instantStream = TimelineLayout.getLayout(getTimelineLayoutVersion()).filterHoodieInstants(instantStream);
}
return instantStream.sorted().collect(Collectors.toList());
return Pair.of(instantStream.sorted().collect(Collectors.toList()), lastUpdatedTime.orElse(0L));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, Set<String> inc
// Filter all the filter in the metapath and include only the extensions passed and
// convert them into HoodieInstant
try {
this.setInstants(metaClient.scanHoodieInstantsFromFileSystem(includedExtensions, applyLayoutFilters));
Pair<List<HoodieInstant>, Long> instantsResult = metaClient.scanHoodieInstantsFromFileSystem(includedExtensions, applyLayoutFilters);
this.setInstants(instantsResult.getKey(), instantsResult.getValue());
} catch (IOException e) {
throw new HoodieIOException("Failed to scan metadata", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -90,7 +91,8 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
*/
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
this.metaClient = metaClient;
setInstants(this.loadInstants(false));
Pair<List<HoodieInstant>, Long> instantsResult = this.loadInstants(false);
setInstants(instantsResult.getKey(), instantsResult.getValue());
// multiple casts will make this lambda serializable -
// http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails;
Expand Down Expand Up @@ -196,15 +198,15 @@ private Option<String> getMetadataKey(String action) {
}
}

private List<HoodieInstant> loadInstants(boolean loadInstantDetails) {
private Pair<List<HoodieInstant>, Long> loadInstants(boolean loadInstantDetails) {
return loadInstants(null, loadInstantDetails);
}

private List<HoodieInstant> loadInstants(String startTs, String endTs) {
private Pair<List<HoodieInstant>, Long> loadInstants(String startTs, String endTs) {
return loadInstants(new TimeRangeFilter(startTs, endTs), true);
}

private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean loadInstantDetails) {
private Pair<List<HoodieInstant>, Long> loadInstants(TimeRangeFilter filter, boolean loadInstantDetails) {
return loadInstants(filter, loadInstantDetails, record -> true);
}

Expand All @@ -214,18 +216,21 @@ private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean loadIns
* If filter is specified, only the filtered instants are loaded
* If commitsFilter is specified, only the filtered records are loaded
*/
private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean loadInstantDetails,
private Pair<List<HoodieInstant>, Long> loadInstants(TimeRangeFilter filter, boolean loadInstantDetails,
Function<GenericRecord, Boolean> commitsFilter) {
try {
// List all files
FileStatus[] fsStatuses = metaClient.getFs().globStatus(
new Path(metaClient.getArchivePath() + "/.commits_.archive*"));

long lastUpdateTime = 0L;
// Sort files by version suffix in reverse (implies reverse chronological order)
Arrays.sort(fsStatuses, new ArchiveFileVersionComparator());

Set<HoodieInstant> instantsInRange = new HashSet<>();
for (FileStatus fs : fsStatuses) {
// update lastUpdateTime
lastUpdateTime = Math.max(lastUpdateTime, fs.getModificationTime());
// Read the archived file
try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(),
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
Expand Down Expand Up @@ -279,7 +284,7 @@ private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean loadIns

ArrayList<HoodieInstant> result = new ArrayList<>(instantsInRange);
Collections.sort(result);
return result;
return Pair.of(result, lastUpdateTime);
} catch (IOException e) {
throw new HoodieIOException(
"Could not load archived commit timeline from path " + metaClient.getArchivePath(), e);
Expand Down Expand Up @@ -331,6 +336,6 @@ public HoodieDefaultTimeline getWriteTimeline() {
Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
return new HoodieDefaultTimeline(getInstants().filter(i ->
readCommits.keySet().contains(i.getTimestamp()))
.filter(s -> validActions.contains(s.getAction())), details);
.filter(s -> validActions.contains(s.getAction())), details, getLastUpdateTime());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,20 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
protected transient Function<HoodieInstant, Option<byte[]>> details;
private List<HoodieInstant> instants;
private String timelineHash;
private long lastUpdatedTime;

public HoodieDefaultTimeline(Stream<HoodieInstant> instants, Function<HoodieInstant, Option<byte[]>> details) {
public HoodieDefaultTimeline(Stream<HoodieInstant> instants, Function<HoodieInstant, Option<byte[]>> details, long lastUpdatedTime) {
this.details = details;
setInstants(instants.collect(Collectors.toList()));
setInstants(instants.collect(Collectors.toList()), lastUpdatedTime);
}

public void setInstants(List<HoodieInstant> instants) {
setInstants(instants, System.currentTimeMillis());
}

public void setInstants(List<HoodieInstant> instants, long lastUpdatedTime) {
this.instants = instants;
this.lastUpdatedTime = lastUpdatedTime;
final MessageDigest md;
try {
md = MessageDigest.getInstance(HASHING_ALGORITHM);
Expand All @@ -79,106 +85,106 @@ public HoodieDefaultTimeline() {}

@Override
public HoodieTimeline filterInflights() {
return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isInflight), details);
return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isInflight), details, lastUpdatedTime);
}

@Override
public HoodieTimeline filterInflightsAndRequested() {
return new HoodieDefaultTimeline(
instants.stream().filter(i -> i.getState().equals(State.REQUESTED) || i.getState().equals(State.INFLIGHT)),
details);
details, lastUpdatedTime);
}

@Override
public HoodieTimeline filterPendingExcludingCompaction() {
return new HoodieDefaultTimeline(instants.stream().filter(instant -> (!instant.isCompleted())
&& (!instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION))), details);
&& (!instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION))), details, lastUpdatedTime);
}

@Override
public HoodieTimeline filterCompletedInstants() {
return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isCompleted), details);
return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isCompleted), details, lastUpdatedTime);
}

@Override
public HoodieTimeline filterCompletedAndCompactionInstants() {
return new HoodieDefaultTimeline(instants.stream().filter(s -> s.isCompleted()
|| s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)), details);
|| s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)), details, lastUpdatedTime);
}

@Override
public HoodieDefaultTimeline getWriteTimeline() {
Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
return new HoodieDefaultTimeline(instants.stream().filter(s -> validActions.contains(s.getAction())), details);
return new HoodieDefaultTimeline(instants.stream().filter(s -> validActions.contains(s.getAction())), details, lastUpdatedTime);
}

@Override
public HoodieTimeline getCompletedReplaceTimeline() {
return new HoodieDefaultTimeline(
instants.stream().filter(s -> s.getAction().equals(REPLACE_COMMIT_ACTION)).filter(HoodieInstant::isCompleted), details);
instants.stream().filter(s -> s.getAction().equals(REPLACE_COMMIT_ACTION)).filter(HoodieInstant::isCompleted), details, lastUpdatedTime);
}

@Override
public HoodieTimeline filterPendingReplaceTimeline() {
return new HoodieDefaultTimeline(instants.stream().filter(
s -> s.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) && !s.isCompleted()), details);
s -> s.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) && !s.isCompleted()), details, lastUpdatedTime);
}

@Override
public HoodieTimeline filterPendingRollbackTimeline() {
return new HoodieDefaultTimeline(instants.stream().filter(
s -> s.getAction().equals(HoodieTimeline.ROLLBACK_ACTION) && !s.isCompleted()), details);
s -> s.getAction().equals(HoodieTimeline.ROLLBACK_ACTION) && !s.isCompleted()), details, lastUpdatedTime);
}

@Override
public HoodieTimeline filterPendingCompactionTimeline() {
return new HoodieDefaultTimeline(
instants.stream().filter(s -> s.getAction().equals(HoodieTimeline.COMPACTION_ACTION) && !s.isCompleted()), details);
instants.stream().filter(s -> s.getAction().equals(HoodieTimeline.COMPACTION_ACTION) && !s.isCompleted()), details, lastUpdatedTime);
}

@Override
public HoodieDefaultTimeline findInstantsInRange(String startTs, String endTs) {
return new HoodieDefaultTimeline(
instants.stream().filter(s -> HoodieTimeline.isInRange(s.getTimestamp(), startTs, endTs)), details);
instants.stream().filter(s -> HoodieTimeline.isInRange(s.getTimestamp(), startTs, endTs)), details, lastUpdatedTime);
}

@Override
public HoodieDefaultTimeline findInstantsAfter(String instantTime, int numCommits) {
return new HoodieDefaultTimeline(instants.stream()
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)).limit(numCommits),
details);
details, lastUpdatedTime);
}

@Override
public HoodieTimeline findInstantsAfter(String instantTime) {
return new HoodieDefaultTimeline(instants.stream()
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)), details);
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)), details, lastUpdatedTime);
}

@Override
public HoodieDefaultTimeline findInstantsAfterOrEquals(String commitTime, int numCommits) {
return new HoodieDefaultTimeline(instants.stream()
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, commitTime))
.limit(numCommits), details);
.limit(numCommits), details, lastUpdatedTime);
}

@Override
public HoodieDefaultTimeline findInstantsBefore(String instantTime) {
return new HoodieDefaultTimeline(instants.stream()
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantTime)),
details);
details, lastUpdatedTime);
}

@Override
public HoodieDefaultTimeline findInstantsBeforeOrEquals(String instantTime) {
return new HoodieDefaultTimeline(instants.stream()
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS, instantTime)),
details);
details, lastUpdatedTime);
}

@Override
public HoodieTimeline filter(Predicate<HoodieInstant> filter) {
return new HoodieDefaultTimeline(instants.stream().filter(filter), details);
return new HoodieDefaultTimeline(instants.stream().filter(filter), details, lastUpdatedTime);
}

/**
Expand Down Expand Up @@ -209,7 +215,7 @@ public HoodieTimeline getCommitTimeline() {
*/
public HoodieTimeline getDeltaCommitTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(DELTA_COMMIT_ACTION),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails, lastUpdatedTime);
}

/**
Expand All @@ -219,23 +225,23 @@ public HoodieTimeline getDeltaCommitTimeline() {
*/
public HoodieTimeline getTimelineOfActions(Set<String> actions) {
return new HoodieDefaultTimeline(getInstants().filter(s -> actions.contains(s.getAction())),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails, lastUpdatedTime);
}

/**
* Get only the cleaner action (inflight and completed) in the active timeline.
*/
public HoodieTimeline getCleanerTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(CLEAN_ACTION),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails, lastUpdatedTime);
}

/**
* Get only the rollback action (inflight and completed) in the active timeline.
*/
public HoodieTimeline getRollbackTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(ROLLBACK_ACTION),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails, lastUpdatedTime);
}

/**
Expand All @@ -250,15 +256,15 @@ public HoodieTimeline getRollbackAndRestoreTimeline() {
*/
public HoodieTimeline getSavePointTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(SAVEPOINT_ACTION),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails, lastUpdatedTime);
}

/**
* Get only the restore action (inflight and completed) in the active timeline.
*/
public HoodieTimeline getRestoreTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(RESTORE_ACTION),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails, lastUpdatedTime);
}

protected Stream<HoodieInstant> filterInstantsByAction(String action) {
Expand Down Expand Up @@ -351,6 +357,11 @@ public Option<byte[]> getInstantDetails(HoodieInstant instant) {
return details.apply(instant);
}

@Override
public long getLastUpdateTime() {
return lastUpdatedTime;
}

@Override
public boolean isEmpty(HoodieInstant instant) {
return getInstantDetails(instant).get().length == 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ public interface HoodieTimeline extends Serializable {
*/
Option<byte[]> getInstantDetails(HoodieInstant instant);

long getLastUpdateTime();

boolean isEmpty(HoodieInstant instant);

/**
Expand Down
Loading