Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient)
*/
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
fs = null; // will be lazily inited
fs = null; // will be lazily initialized
}

private void writeObject(java.io.ObjectOutputStream out) throws IOException {
Expand Down Expand Up @@ -330,7 +330,7 @@ public FileSystemRetryConfig getFileSystemRetryConfig() {
* Get the archived commits as a timeline. This is costly operation, as all data from the archived files are read.
* This should not be used, unless for historical debugging purposes.
*
* @return Active commit timeline
* @return Archived commit timeline
*/
public synchronized HoodieArchivedTimeline getArchivedTimeline() {
if (archivedTimeline == null) {
Expand All @@ -339,6 +339,20 @@ public synchronized HoodieArchivedTimeline getArchivedTimeline() {
return archivedTimeline;
}

/**
* Returns fresh new archived commits as a timeline from startTs (inclusive).
*
* <p>This is costly operation if really early endTs is specified.
* Be caution to use this only when the time range is short.
*
* <p>This method is not thread safe.
*
* @return Archived commit timeline
*/
public HoodieArchivedTimeline getArchivedTimeline(String startTs) {
return new HoodieArchivedTimeline(this, startTs);
}

/**
* Validate table properties.
* @param properties Properties from writeConfig.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
private static final String ACTION_TYPE_KEY = "actionType";
private static final String ACTION_STATE = "actionState";
private HoodieTableMetaClient metaClient;
private Map<String, byte[]> readCommits = new HashMap<>();
private final Map<String, byte[]> readCommits = new HashMap<>();

private static final Logger LOG = LogManager.getLogger(HoodieArchivedTimeline.class);

/**
* Loads instants between (startTs, endTs].
* Note that there is no lazy loading, so this may not work if really long time range (endTs-startTs) is specified.
* Loads all the archived instants.
* Note that there is no lazy loading, so this may not work if the archived timeline range is really long.
* TBD: Should we enforce maximum time range?
*/
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
Expand All @@ -96,6 +96,19 @@ public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails;
}

/**
* Loads completed instants from startTs(inclusive).
* Note that there is no lazy loading, so this may not work if really early startTs is specified.
*/
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient, String startTs) {
this.metaClient = metaClient;
setInstants(loadInstants(new StartTsFilter(startTs), true,
record -> HoodieInstant.State.COMPLETED.toString().equals(record.get(ACTION_STATE).toString())));
// multiple casts will make this lambda serializable -
// http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails;
}

/**
* For serialization and de-serialization only.
*
Expand Down Expand Up @@ -300,6 +313,19 @@ public boolean isInRange(HoodieInstant instant) {
}
}

private static class StartTsFilter extends TimeRangeFilter {
private final String startTs;

public StartTsFilter(String startTs) {
super(startTs, null); // endTs is never used
this.startTs = startTs;
}

public boolean isInRange(HoodieInstant instant) {
return HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN_OR_EQUALS, startTs);
}
}

/**
* Sort files by reverse order of version suffix in file name.
*/
Expand Down Expand Up @@ -330,7 +356,7 @@ public HoodieDefaultTimeline getWriteTimeline() {
// filter in-memory instants
Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
return new HoodieDefaultTimeline(getInstants().filter(i ->
readCommits.keySet().contains(i.getTimestamp()))
readCommits.containsKey(i.getTimestamp()))
.filter(s -> validActions.contains(s.getAction())), details);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,22 +250,12 @@ private List<HoodieCommitMetadata> getArchivedMetadata(
InstantRange instantRange,
HoodieTimeline commitTimeline,
String tableName) {
if (instantRange == null || commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) {
// read the archived metadata if:
// 1. the start commit is 'earliest';
// 2. the start instant is archived.
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
if (commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) {
// read the archived metadata if the start instant is archived.
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(instantRange.getStartInstant());
HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants();
if (!archivedCompleteTimeline.empty()) {
final String endTs = archivedCompleteTimeline.lastInstant().get().getTimestamp();
Stream<HoodieInstant> instantStream = archivedCompleteTimeline.getInstants();
if (instantRange != null) {
archivedTimeline.loadInstantDetailsInMemory(instantRange.getStartInstant(), endTs);
instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, instantRange.getStartInstant()));
} else {
final String startTs = archivedCompleteTimeline.firstInstant().get().getTimestamp();
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
}
return maySkipCompaction(instantStream)
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList());
}
Expand Down