diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 9dcd50c1cd838..70e9473db32fa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -58,6 +58,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -97,6 +98,10 @@ public class HoodieTableMetaClient implements Serializable { public static final String MARKER_EXTN = ".marker"; + // In-memory cache for archived timeline based on the start instant time + // Only one entry should be present in this map + private final Map archivedTimelineMap = new HashMap<>(); + // NOTE: Since those two parameters lay on the hot-path of a lot of computations, we // use tailored extension of the {@code Path} class allowing to avoid repetitive // computations secured by its immutability @@ -109,7 +114,6 @@ public class HoodieTableMetaClient implements Serializable { private TimelineLayoutVersion timelineLayoutVersion; protected HoodieTableConfig tableConfig; protected HoodieActiveTimeline activeTimeline; - private HoodieArchivedTimeline archivedTimeline; private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build(); private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build(); protected HoodieMetastoreConfig metastoreConfig; @@ -371,10 +375,7 @@ public FileSystemRetryConfig getFileSystemRetryConfig() { * @return Archived commit timeline */ public synchronized HoodieArchivedTimeline getArchivedTimeline() { - if (archivedTimeline == null) { - archivedTimeline = new HoodieArchivedTimeline(this); - } - return archivedTimeline; + return getArchivedTimeline(StringUtils.EMPTY_STRING); } public HoodieMetastoreConfig getMetastoreConfig() { @@ -385,21 +386,49 @@ public HoodieMetastoreConfig getMetastoreConfig() { } /** - * Returns fresh new archived commits as a timeline from startTs (inclusive). - * - *

This is costly operation if really early endTs is specified. - * Be caution to use this only when the time range is short. - * - *

This method is not thread safe. + * Returns the cached archived timeline from startTs (inclusive). * - * @return Archived commit timeline + * @param startTs The start instant time (inclusive) of the archived timeline. + * @return the archived timeline. */ public HoodieArchivedTimeline getArchivedTimeline(String startTs) { - return new HoodieArchivedTimeline(this, startTs); + return getArchivedTimeline(startTs, true); + } + + /** + * Returns the cached archived timeline if using in-memory cache or a fresh new archived + * timeline if not using cache, from startTs (inclusive). + *

+ * Instantiating an archived timeline is costly operation if really early startTs is + * specified. + *

+ * This method is not thread safe. + * + * @param startTs The start instant time (inclusive) of the archived timeline. + * @param useCache Whether to use in-memory cache. + * @return the archived timeline based on the arguments. + */ + public HoodieArchivedTimeline getArchivedTimeline(String startTs, boolean useCache) { + if (useCache) { + if (!archivedTimelineMap.containsKey(startTs)) { + // Only keep one entry in the map + archivedTimelineMap.clear(); + archivedTimelineMap.put(startTs, instantiateArchivedTimeline(startTs)); + } + return archivedTimelineMap.get(startTs); + } + return instantiateArchivedTimeline(startTs); + } + + private HoodieArchivedTimeline instantiateArchivedTimeline(String startTs) { + return StringUtils.isNullOrEmpty(startTs) + ? new HoodieArchivedTimeline(this) + : new HoodieArchivedTimeline(this, startTs); } /** * Validate table properties. + * * @param properties Properties from writeConfig. */ public void validateTableProperties(Properties properties) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index 6b517d6022f86..1f9d416b2b545 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -210,11 +210,30 @@ public static HoodieDefaultTimeline getTimeline(HoodieTableMetaClient metaClient return activeTimeline; } + /** + * Returns a Hudi timeline with commits after the given instant time (exclusive). + * + * @param metaClient {@link HoodieTableMetaClient} instance. + * @param exclusiveStartInstantTime Start instant time (exclusive). + * @return Hudi timeline. + */ + public static HoodieTimeline getCommitsTimelineAfter( + HoodieTableMetaClient metaClient, String exclusiveStartInstantTime) { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieDefaultTimeline timeline = + activeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime) + ? metaClient.getArchivedTimeline(exclusiveStartInstantTime) + .mergeTimeline(activeTimeline) + : activeTimeline; + return timeline.getCommitsTimeline() + .findInstantsAfter(exclusiveStartInstantTime, Integer.MAX_VALUE); + } + /** * Returns the commit metadata of the given instant. * - * @param instant The hoodie instant - * @param timeline The timeline + * @param instant The hoodie instant + * @param timeline The timeline * @return the commit metadata */ public static HoodieCommitMetadata getCommitMetadata( diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index 842e7069ec129..5e91118b26962 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -30,6 +30,8 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; +import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; @@ -51,10 +53,21 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import static org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests {@link TimelineUtils}. @@ -119,7 +132,7 @@ public void testGetPartitions() throws IOException { String olderPartition = "0"; // older partitions that is modified by all cleans for (int i = 1; i <= 5; i++) { String ts = i + ""; - HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts); + HoodieInstant instant = new HoodieInstant(true, COMMIT_ACTION, ts); activeTimeline.createNewInstant(instant); activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap()))); @@ -158,7 +171,7 @@ public void testGetPartitionsUnPartitioned() throws IOException { String partitionPath = ""; for (int i = 1; i <= 5; i++) { String ts = i + ""; - HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts); + HoodieInstant instant = new HoodieInstant(true, COMMIT_ACTION, ts); activeTimeline.createNewInstant(instant); activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, partitionPath, ts, 2, Collections.emptyMap()))); @@ -187,7 +200,7 @@ public void testRestoreInstants() throws Exception { String ts = i + ""; HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, ts); activeTimeline.createNewInstant(instant); - activeTimeline.saveAsComplete(instant, Option.of(getRestoreMetadata(basePath, ts, ts, 2, HoodieTimeline.COMMIT_ACTION))); + activeTimeline.saveAsComplete(instant, Option.of(getRestoreMetadata(basePath, ts, ts, 2, COMMIT_ACTION))); } metaClient.reloadActiveTimeline(); @@ -210,12 +223,12 @@ public void testGetExtraMetadata() throws Exception { assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey).isPresent()); String ts = "0"; - HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts); + HoodieInstant instant = new HoodieInstant(true, COMMIT_ACTION, ts); activeTimeline.createNewInstant(instant); activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap()))); ts = "1"; - instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts); + instant = new HoodieInstant(true, COMMIT_ACTION, ts); activeTimeline.createNewInstant(instant); Map extraMetadata = new HashMap<>(); extraMetadata.put(extraMetadataKey, extraMetadataValue1); @@ -251,6 +264,81 @@ public void testGetExtraMetadata() throws Exception { assertEquals(newValueForMetadata, extraMetadataEntries.get("2").get()); } + @Test + public void testGetCommitsTimelineAfter() throws IOException { + // Should only load active timeline + String startTs = "010"; + HoodieTableMetaClient mockMetaClient = prepareMetaClient( + Arrays.asList( + new HoodieInstant(COMPLETED, COMMIT_ACTION, "009"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")), + Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "002")), + startTs + ); + verifyTimeline( + Arrays.asList( + new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")), + TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs)); + verify(mockMetaClient, never()).getArchivedTimeline(any()); + + // Should load both archived and active timeline + startTs = "001"; + mockMetaClient = prepareMetaClient( + Arrays.asList( + new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "009"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")), + Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "002")), + startTs + ); + verifyTimeline( + Arrays.asList( + new HoodieInstant(COMPLETED, COMMIT_ACTION, "002"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")), + TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs)); + verify(mockMetaClient, times(1)).getArchivedTimeline(any()); + } + + private HoodieTableMetaClient prepareMetaClient( + List activeInstants, + List archivedInstants, + String startTs + ) throws IOException { + HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class); + HoodieArchivedTimeline mockArchivedTimeline = mock(HoodieArchivedTimeline.class); + when(mockMetaClient.scanHoodieInstantsFromFileSystem(any(), eq(true))) + .thenReturn(activeInstants); + HoodieActiveTimeline activeTimeline = new HoodieActiveTimeline(mockMetaClient); + when(mockMetaClient.getActiveTimeline()) + .thenReturn(activeTimeline); + when(mockMetaClient.getArchivedTimeline(any())) + .thenReturn(mockArchivedTimeline); + HoodieDefaultTimeline mergedTimeline = new HoodieDefaultTimeline( + archivedInstants.stream() + .filter(instant -> instant.getTimestamp().compareTo(startTs) >= 0), + i -> Option.empty()) + .mergeTimeline(activeTimeline); + when(mockArchivedTimeline.mergeTimeline(eq(activeTimeline))) + .thenReturn(mergedTimeline); + + return mockMetaClient; + } + + public void verifyTimeline(List expectedInstants, HoodieTimeline timeline) { + assertEquals( + expectedInstants.stream().sorted().collect(Collectors.toList()), + timeline.getInstants().stream().sorted().collect(Collectors.toList()) + ); + } + private void verifyExtraMetadataLatestValue(String extraMetadataKey, String expected, boolean includeClustering) { final Option extraLatestValue; if (includeClustering) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index 92ba50cf1950e..a5bab0e5759a1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -18,7 +18,6 @@ package org.apache.hudi.source; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.FileSlice; @@ -43,6 +42,7 @@ import org.apache.hudi.util.ClusteringUtil; import org.apache.hudi.util.StreamerUtil; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.table.types.logical.RowType; @@ -462,7 +462,7 @@ private HoodieTimeline getReadTimeline(HoodieTableMetaClient metaClient) { } private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient metaClient, String startInstant) { - HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(startInstant); + HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(startInstant, false); HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants(); return filterInstantsByCondition(archivedCompleteTimeline); } diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java index 56ff82f5e45a5..1db3707895f09 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java @@ -88,10 +88,9 @@ public boolean isBootstrap() { * Going through archive timeline is a costly operation, and it should be avoided unless some start time is given. */ public Set getDroppedPartitionsSince(Option lastCommitTimeSynced) { - HoodieTimeline timeline = lastCommitTimeSynced.isPresent() ? metaClient.getArchivedTimeline(lastCommitTimeSynced.get()) - .mergeTimeline(metaClient.getActiveTimeline()) - .getCommitsTimeline() - .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE) : metaClient.getActiveTimeline(); + HoodieTimeline timeline = lastCommitTimeSynced.isPresent() + ? TimelineUtils.getCommitsTimelineAfter(metaClient, lastCommitTimeSynced.get()) + : metaClient.getActiveTimeline(); return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline)); } @@ -126,10 +125,7 @@ public List getWrittenPartitionsSince(Option lastCommitTimeSynce } else { LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then"); return TimelineUtils.getWrittenPartitions( - metaClient.getArchivedTimeline(lastCommitTimeSynced.get()) - .mergeTimeline(metaClient.getActiveTimeline()) - .getCommitsTimeline() - .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE)); + TimelineUtils.getCommitsTimelineAfter(metaClient, lastCommitTimeSynced.get())); } }