Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Collection;
Expand All @@ -43,6 +46,7 @@
* 2) Incremental reads - InputFormats can use this API to query
*/
public class TimelineUtils {
private static final Logger LOG = LogManager.getLogger(TimelineUtils.class);

/**
* Returns partitions that have new data strictly after commitTime.
Expand Down Expand Up @@ -117,13 +121,27 @@ public static List<String> getAffectedPartitions(HoodieTimeline timeline) {
}

/**
* Get extra metadata for specified key from latest commit/deltacommit instant.
* Get extra metadata for specified key from latest commit/deltacommit/replacecommit(eg. insert_overwrite) instant.
*/
public static Option<String> getExtraMetadataFromLatest(HoodieTableMetaClient metaClient, String extraMetadataKey) {
return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants().findFirst().map(instant ->
return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants()
// exclude clustering commits for returning user stored extra metadata
.filter(instant -> !isClusteringCommit(metaClient, instant))
.findFirst().map(instant ->
getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty());
}


/**
* Get extra metadata for specified key from latest commit/deltacommit/replacecommit instant including internal commits
* such as clustering.
*/
public static Option<String> getExtraMetadataFromLatestIncludeClustering(HoodieTableMetaClient metaClient, String extraMetadataKey) {
return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants()
.findFirst().map(instant ->
getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty());
}

/**
* Get extra metadata for specified key from all active commit/deltacommit instants.
*/
Expand All @@ -134,6 +152,7 @@ public static Map<String, Option<String>> getAllExtraMetadataForKey(HoodieTableM

private static Option<String> getMetadataValue(HoodieTableMetaClient metaClient, String extraMetadataKey, HoodieInstant instant) {
try {
LOG.info("reading checkpoint info for:" + instant + " key: " + extraMetadataKey);
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
metaClient.getCommitsTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);

Expand All @@ -142,4 +161,20 @@ private static Option<String> getMetadataValue(HoodieTableMetaClient metaClient,
throw new HoodieIOException("Unable to parse instant metadata " + instant, e);
}
}

private static boolean isClusteringCommit(HoodieTableMetaClient metaClient, HoodieInstant instant) {
try {
if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
// replacecommit is used for multiple operations: insert_overwrite/cluster etc.
// Check operation type to see if this instant is related to clustering.
HoodieReplaceCommitMetadata replaceMetadata = HoodieReplaceCommitMetadata.fromBytes(
metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
return WriteOperationType.CLUSTER.equals(replaceMetadata.getOperationType());
}

return false;
} catch (IOException e) {
throw new HoodieIOException("Unable to read instant information: " + instant + " for " + metaClient.getBasePath(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand Down Expand Up @@ -73,7 +74,8 @@ public void testGetPartitionsWithReplaceCommits() throws IOException {
activeTimeline.createNewInstant(instant1);
// create replace metadata only with replaced file Ids (no new files created)
activeTimeline.saveAsComplete(instant1,
Option.of(getReplaceCommitMetadata(basePath, ts1, replacePartition, 2, newFilePartition, 0, Collections.emptyMap())));
Option.of(getReplaceCommitMetadata(basePath, ts1, replacePartition, 2,
newFilePartition, 0, Collections.emptyMap(), WriteOperationType.CLUSTER)));
metaClient.reloadActiveTimeline();

List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("0", 10));
Expand All @@ -85,7 +87,8 @@ public void testGetPartitionsWithReplaceCommits() throws IOException {
activeTimeline.createNewInstant(instant2);
// create replace metadata only with replaced file Ids (no new files created)
activeTimeline.saveAsComplete(instant2,
Option.of(getReplaceCommitMetadata(basePath, ts2, replacePartition, 0, newFilePartition, 3, Collections.emptyMap())));
Option.of(getReplaceCommitMetadata(basePath, ts2, replacePartition, 0,
newFilePartition, 3, Collections.emptyMap(), WriteOperationType.CLUSTER)));
metaClient.reloadActiveTimeline();
partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
assertEquals(1, partitions.size());
Expand Down Expand Up @@ -211,16 +214,42 @@ public void testGetExtraMetadata() throws Exception {
metaClient.reloadActiveTimeline();

// verify modified partitions included cleaned data
Option<String> extraLatestValue = TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey);
assertTrue(extraLatestValue.isPresent());
assertEquals(extraMetadataValue1, extraLatestValue.get());
verifyExtraMetadataLatestValue(extraMetadataKey, extraMetadataValue1, false);
assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, "unknownKey").isPresent());

// verify adding clustering commit doesnt change behavior of getExtraMetadataFromLatest
String ts2 = "2";
HoodieInstant instant2 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, ts2);
activeTimeline.createNewInstant(instant2);
String newValueForMetadata = "newValue2";
extraMetadata.put(extraMetadataKey, newValueForMetadata);
activeTimeline.saveAsComplete(instant2,
Option.of(getReplaceCommitMetadata(basePath, ts2, "p2", 0,
"p2", 3, extraMetadata, WriteOperationType.CLUSTER)));
metaClient.reloadActiveTimeline();

verifyExtraMetadataLatestValue(extraMetadataKey, extraMetadataValue1, false);
verifyExtraMetadataLatestValue(extraMetadataKey, newValueForMetadata, true);
assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, "unknownKey").isPresent());

Map<String, Option<String>> extraMetadataEntries = TimelineUtils.getAllExtraMetadataForKey(metaClient, extraMetadataKey);
assertEquals(2, extraMetadataEntries.size());
assertEquals(3, extraMetadataEntries.size());
assertFalse(extraMetadataEntries.get("0").isPresent());
assertTrue(extraMetadataEntries.get("1").isPresent());
assertEquals(extraMetadataValue1, extraMetadataEntries.get("1").get());
assertTrue(extraMetadataEntries.get("2").isPresent());
assertEquals(newValueForMetadata, extraMetadataEntries.get("2").get());
}

private void verifyExtraMetadataLatestValue(String extraMetadataKey, String expected, boolean includeClustering) {
final Option<String> extraLatestValue;
if (includeClustering) {
extraLatestValue = TimelineUtils.getExtraMetadataFromLatestIncludeClustering(metaClient, extraMetadataKey);
} else {
extraLatestValue = TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey);
}
assertTrue(extraLatestValue.isPresent());
assertEquals(expected, extraLatestValue.get());
}

private byte[] getRestoreMetadata(String basePath, String partition, String commitTs, int count, String actionType) throws IOException {
Expand Down Expand Up @@ -265,9 +294,11 @@ private byte[] getCommitMetadata(String basePath, String partition, String commi
}

private byte[] getReplaceCommitMetadata(String basePath, String commitTs, String replacePartition, int replaceCount,
String newFilePartition, int newFileCount, Map<String, String> extraMetadata)
String newFilePartition, int newFileCount, Map<String, String> extraMetadata,
WriteOperationType operationType)
throws IOException {
HoodieReplaceCommitMetadata commit = new HoodieReplaceCommitMetadata();
commit.setOperationType(operationType);
for (int i = 1; i <= newFileCount; i++) {
HoodieWriteStat stat = new HoodieWriteStat();
stat.setFileId(i + "");
Expand Down