diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 740d569a590bd..4c1eac79dc413 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -156,7 +156,7 @@ public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) */ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); - fs = null; // will be lazily inited + fs = null; // will be lazily initialized } private void writeObject(java.io.ObjectOutputStream out) throws IOException { @@ -330,7 +330,7 @@ public FileSystemRetryConfig getFileSystemRetryConfig() { * Get the archived commits as a timeline. This is costly operation, as all data from the archived files are read. * This should not be used, unless for historical debugging purposes. * - * @return Active commit timeline + * @return Archived commit timeline */ public synchronized HoodieArchivedTimeline getArchivedTimeline() { if (archivedTimeline == null) { @@ -339,6 +339,20 @@ public synchronized HoodieArchivedTimeline getArchivedTimeline() { return archivedTimeline; } + /** + * Returns fresh new archived commits as a timeline from startTs (inclusive). + * + *

This is costly operation if really early endTs is specified. + * Be caution to use this only when the time range is short. + * + *

This method is not thread safe. + * + * @return Archived commit timeline + */ + public HoodieArchivedTimeline getArchivedTimeline(String startTs) { + return new HoodieArchivedTimeline(this, startTs); + } + /** * Validate table properties. * @param properties Properties from writeConfig. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index 5ad3fa7a9f215..29f166530cdb4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -79,13 +79,13 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { private static final String ACTION_TYPE_KEY = "actionType"; private static final String ACTION_STATE = "actionState"; private HoodieTableMetaClient metaClient; - private Map readCommits = new HashMap<>(); + private final Map readCommits = new HashMap<>(); private static final Logger LOG = LogManager.getLogger(HoodieArchivedTimeline.class); /** - * Loads instants between (startTs, endTs]. - * Note that there is no lazy loading, so this may not work if really long time range (endTs-startTs) is specified. + * Loads all the archived instants. + * Note that there is no lazy loading, so this may not work if the archived timeline range is really long. * TBD: Should we enforce maximum time range? */ public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) { @@ -96,6 +96,19 @@ public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) { this.details = (Function> & Serializable) this::getInstantDetails; } + /** + * Loads completed instants from startTs(inclusive). + * Note that there is no lazy loading, so this may not work if really early startTs is specified. + */ + public HoodieArchivedTimeline(HoodieTableMetaClient metaClient, String startTs) { + this.metaClient = metaClient; + setInstants(loadInstants(new StartTsFilter(startTs), true, + record -> HoodieInstant.State.COMPLETED.toString().equals(record.get(ACTION_STATE).toString()))); + // multiple casts will make this lambda serializable - + // http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16 + this.details = (Function> & Serializable) this::getInstantDetails; + } + /** * For serialization and de-serialization only. * @@ -300,6 +313,19 @@ public boolean isInRange(HoodieInstant instant) { } } + private static class StartTsFilter extends TimeRangeFilter { + private final String startTs; + + public StartTsFilter(String startTs) { + super(startTs, null); // endTs is never used + this.startTs = startTs; + } + + public boolean isInRange(HoodieInstant instant) { + return HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN_OR_EQUALS, startTs); + } + } + /** * Sort files by reverse order of version suffix in file name. */ @@ -330,7 +356,7 @@ public HoodieDefaultTimeline getWriteTimeline() { // filter in-memory instants Set validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION); return new HoodieDefaultTimeline(getInstants().filter(i -> - readCommits.keySet().contains(i.getTimestamp())) + readCommits.containsKey(i.getTimestamp())) .filter(s -> validActions.contains(s.getAction())), details); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index 58c38ef56744e..02e0e253cf577 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -250,22 +250,12 @@ private List getArchivedMetadata( InstantRange instantRange, HoodieTimeline commitTimeline, String tableName) { - if (instantRange == null || commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) { - // read the archived metadata if: - // 1. the start commit is 'earliest'; - // 2. the start instant is archived. - HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); + if (commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) { + // read the archived metadata if the start instant is archived. + HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(instantRange.getStartInstant()); HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants(); if (!archivedCompleteTimeline.empty()) { - final String endTs = archivedCompleteTimeline.lastInstant().get().getTimestamp(); Stream instantStream = archivedCompleteTimeline.getInstants(); - if (instantRange != null) { - archivedTimeline.loadInstantDetailsInMemory(instantRange.getStartInstant(), endTs); - instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, instantRange.getStartInstant())); - } else { - final String startTs = archivedCompleteTimeline.firstInstant().get().getTimestamp(); - archivedTimeline.loadInstantDetailsInMemory(startTs, endTs); - } return maySkipCompaction(instantStream) .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList()); }