Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -97,6 +98,10 @@ public class HoodieTableMetaClient implements Serializable {

public static final String MARKER_EXTN = ".marker";

// In-memory cache for archived timeline based on the start instant time
// Only one entry should be present in this map
private final Map<String, HoodieArchivedTimeline> archivedTimelineMap = new HashMap<>();

// NOTE: Since those two parameters lay on the hot-path of a lot of computations, we
// use tailored extension of the {@code Path} class allowing to avoid repetitive
// computations secured by its immutability
Expand All @@ -109,7 +114,6 @@ public class HoodieTableMetaClient implements Serializable {
private TimelineLayoutVersion timelineLayoutVersion;
protected HoodieTableConfig tableConfig;
protected HoodieActiveTimeline activeTimeline;
private HoodieArchivedTimeline archivedTimeline;
private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build();
protected HoodieMetastoreConfig metastoreConfig;
Expand Down Expand Up @@ -371,10 +375,7 @@ public FileSystemRetryConfig getFileSystemRetryConfig() {
* @return Archived commit timeline
*/
public synchronized HoodieArchivedTimeline getArchivedTimeline() {
if (archivedTimeline == null) {
archivedTimeline = new HoodieArchivedTimeline(this);
}
return archivedTimeline;
return getArchivedTimeline(StringUtils.EMPTY_STRING);
}

public HoodieMetastoreConfig getMetastoreConfig() {
Expand All @@ -385,21 +386,49 @@ public HoodieMetastoreConfig getMetastoreConfig() {
}

/**
* Returns fresh new archived commits as a timeline from startTs (inclusive).
*
* <p>This is costly operation if really early endTs is specified.
* Be caution to use this only when the time range is short.
*
* <p>This method is not thread safe.
* Returns the cached archived timeline from startTs (inclusive).
*
* @return Archived commit timeline
* @param startTs The start instant time (inclusive) of the archived timeline.
* @return the archived timeline.
*/
public HoodieArchivedTimeline getArchivedTimeline(String startTs) {
return new HoodieArchivedTimeline(this, startTs);
return getArchivedTimeline(startTs, true);
}

/**
* Returns the cached archived timeline if using in-memory cache or a fresh new archived
* timeline if not using cache, from startTs (inclusive).
* <p>
* Instantiating an archived timeline is costly operation if really early startTs is
* specified.
* <p>
* This method is not thread safe.
*
* @param startTs The start instant time (inclusive) of the archived timeline.
* @param useCache Whether to use in-memory cache.
* @return the archived timeline based on the arguments.
*/
public HoodieArchivedTimeline getArchivedTimeline(String startTs, boolean useCache) {
if (useCache) {
if (!archivedTimelineMap.containsKey(startTs)) {
// Only keep one entry in the map
archivedTimelineMap.clear();
archivedTimelineMap.put(startTs, instantiateArchivedTimeline(startTs));
}
return archivedTimelineMap.get(startTs);
}
return instantiateArchivedTimeline(startTs);
}

private HoodieArchivedTimeline instantiateArchivedTimeline(String startTs) {
return StringUtils.isNullOrEmpty(startTs)
? new HoodieArchivedTimeline(this)
: new HoodieArchivedTimeline(this, startTs);
}

/**
* Validate table properties.
*
* @param properties Properties from writeConfig.
*/
public void validateTableProperties(Properties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,30 @@ public static HoodieDefaultTimeline getTimeline(HoodieTableMetaClient metaClient
return activeTimeline;
}

/**
* Returns a Hudi timeline with commits after the given instant time (exclusive).
*
* @param metaClient {@link HoodieTableMetaClient} instance.
* @param exclusiveStartInstantTime Start instant time (exclusive).
* @return Hudi timeline.
*/
public static HoodieTimeline getCommitsTimelineAfter(
HoodieTableMetaClient metaClient, String exclusiveStartInstantTime) {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
HoodieDefaultTimeline timeline =
activeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yihua I have a doubt, since rollback and commit are archived separately, is it possible that there is a very early rollback instant, causing activeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime) to return false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BruceKellan You bring up a good point. So you are talking about the following case where the timeline is like:

ts3.rollback, ts50.commit, ts51.commit, ts52.commit, ...

and ts49.commit and ts48.commit are archived.

If we pass in ts47 as the exclusiveStartInstantTime at this point, activeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime) returns false. However, the active timeline misses ts48.commit and ts49.commit which are required for meta sync. This is a problem if ts48.commit or ts49.commit has partition changes.

I'll put up a fix on that. Thanks for catching it!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the fix: #8991

? metaClient.getArchivedTimeline(exclusiveStartInstantTime)
.mergeTimeline(activeTimeline)
: activeTimeline;
return timeline.getCommitsTimeline()
.findInstantsAfter(exclusiveStartInstantTime, Integer.MAX_VALUE);
}

/**
* Returns the commit metadata of the given instant.
*
* @param instant The hoodie instant
* @param timeline The timeline
* @param instant The hoodie instant
* @param timeline The timeline
* @return the commit metadata
*/
public static HoodieCommitMetadata getCommitMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
Expand All @@ -51,10 +53,21 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* Tests {@link TimelineUtils}.
Expand Down Expand Up @@ -119,7 +132,7 @@ public void testGetPartitions() throws IOException {
String olderPartition = "0"; // older partitions that is modified by all cleans
for (int i = 1; i <= 5; i++) {
String ts = i + "";
HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
HoodieInstant instant = new HoodieInstant(true, COMMIT_ACTION, ts);
activeTimeline.createNewInstant(instant);
activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap())));

Expand Down Expand Up @@ -158,7 +171,7 @@ public void testGetPartitionsUnPartitioned() throws IOException {
String partitionPath = "";
for (int i = 1; i <= 5; i++) {
String ts = i + "";
HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
HoodieInstant instant = new HoodieInstant(true, COMMIT_ACTION, ts);
activeTimeline.createNewInstant(instant);
activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, partitionPath, ts, 2, Collections.emptyMap())));

Expand Down Expand Up @@ -187,7 +200,7 @@ public void testRestoreInstants() throws Exception {
String ts = i + "";
HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, ts);
activeTimeline.createNewInstant(instant);
activeTimeline.saveAsComplete(instant, Option.of(getRestoreMetadata(basePath, ts, ts, 2, HoodieTimeline.COMMIT_ACTION)));
activeTimeline.saveAsComplete(instant, Option.of(getRestoreMetadata(basePath, ts, ts, 2, COMMIT_ACTION)));
}

metaClient.reloadActiveTimeline();
Expand All @@ -210,12 +223,12 @@ public void testGetExtraMetadata() throws Exception {
assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey).isPresent());

String ts = "0";
HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
HoodieInstant instant = new HoodieInstant(true, COMMIT_ACTION, ts);
activeTimeline.createNewInstant(instant);
activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap())));

ts = "1";
instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
instant = new HoodieInstant(true, COMMIT_ACTION, ts);
activeTimeline.createNewInstant(instant);
Map<String, String> extraMetadata = new HashMap<>();
extraMetadata.put(extraMetadataKey, extraMetadataValue1);
Expand Down Expand Up @@ -251,6 +264,81 @@ public void testGetExtraMetadata() throws Exception {
assertEquals(newValueForMetadata, extraMetadataEntries.get("2").get());
}

@Test
public void testGetCommitsTimelineAfter() throws IOException {
// Should only load active timeline
String startTs = "010";
HoodieTableMetaClient mockMetaClient = prepareMetaClient(
Arrays.asList(
new HoodieInstant(COMPLETED, COMMIT_ACTION, "009"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "002")),
startTs
);
verifyTimeline(
Arrays.asList(
new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs));
verify(mockMetaClient, never()).getArchivedTimeline(any());

// Should load both archived and active timeline
startTs = "001";
mockMetaClient = prepareMetaClient(
Arrays.asList(
new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "009"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "002")),
startTs
);
verifyTimeline(
Arrays.asList(
new HoodieInstant(COMPLETED, COMMIT_ACTION, "002"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs));
verify(mockMetaClient, times(1)).getArchivedTimeline(any());
}

private HoodieTableMetaClient prepareMetaClient(
List<HoodieInstant> activeInstants,
List<HoodieInstant> archivedInstants,
String startTs
) throws IOException {
HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
HoodieArchivedTimeline mockArchivedTimeline = mock(HoodieArchivedTimeline.class);
when(mockMetaClient.scanHoodieInstantsFromFileSystem(any(), eq(true)))
.thenReturn(activeInstants);
HoodieActiveTimeline activeTimeline = new HoodieActiveTimeline(mockMetaClient);
when(mockMetaClient.getActiveTimeline())
.thenReturn(activeTimeline);
when(mockMetaClient.getArchivedTimeline(any()))
.thenReturn(mockArchivedTimeline);
HoodieDefaultTimeline mergedTimeline = new HoodieDefaultTimeline(
archivedInstants.stream()
.filter(instant -> instant.getTimestamp().compareTo(startTs) >= 0),
i -> Option.empty())
.mergeTimeline(activeTimeline);
when(mockArchivedTimeline.mergeTimeline(eq(activeTimeline)))
.thenReturn(mergedTimeline);

return mockMetaClient;
}

public void verifyTimeline(List<HoodieInstant> expectedInstants, HoodieTimeline timeline) {
assertEquals(
expectedInstants.stream().sorted().collect(Collectors.toList()),
timeline.getInstants().stream().sorted().collect(Collectors.toList())
);
}

private void verifyExtraMetadataLatestValue(String extraMetadataKey, String expected, boolean includeClustering) {
final Option<String> extraLatestValue;
if (includeClustering) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.source;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
Expand All @@ -43,6 +42,7 @@
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -462,7 +462,7 @@ private HoodieTimeline getReadTimeline(HoodieTableMetaClient metaClient) {
}

private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient metaClient, String startInstant) {
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(startInstant);
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(startInstant, false);
HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants();
return filterInstantsByCondition(archivedCompleteTimeline);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ public boolean isBootstrap() {
* Going through archive timeline is a costly operation, and it should be avoided unless some start time is given.
*/
public Set<String> getDroppedPartitionsSince(Option<String> lastCommitTimeSynced) {
HoodieTimeline timeline = lastCommitTimeSynced.isPresent() ? metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
.mergeTimeline(metaClient.getActiveTimeline())
.getCommitsTimeline()
.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE) : metaClient.getActiveTimeline();
HoodieTimeline timeline = lastCommitTimeSynced.isPresent()
? TimelineUtils.getCommitsTimelineAfter(metaClient, lastCommitTimeSynced.get())
: metaClient.getActiveTimeline();
return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline));
}

Expand Down Expand Up @@ -126,10 +125,7 @@ public List<String> getWrittenPartitionsSince(Option<String> lastCommitTimeSynce
} else {
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
return TimelineUtils.getWrittenPartitions(
metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
.mergeTimeline(metaClient.getActiveTimeline())
.getCommitsTimeline()
.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE));
TimelineUtils.getCommitsTimelineAfter(metaClient, lastCommitTimeSynced.get()));
}
}

Expand Down