diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index f9dacf03a5d8d..de8c5821c1118 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -22,9 +22,12 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.IOException; import java.util.Collection; @@ -43,6 +46,7 @@ * 2) Incremental reads - InputFormats can use this API to query */ public class TimelineUtils { + private static final Logger LOG = LogManager.getLogger(TimelineUtils.class); /** * Returns partitions that have new data strictly after commitTime. @@ -117,13 +121,27 @@ public static List getAffectedPartitions(HoodieTimeline timeline) { } /** - * Get extra metadata for specified key from latest commit/deltacommit instant. + * Get extra metadata for specified key from latest commit/deltacommit/replacecommit(eg. insert_overwrite) instant. */ public static Option getExtraMetadataFromLatest(HoodieTableMetaClient metaClient, String extraMetadataKey) { - return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants().findFirst().map(instant -> + return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants() + // exclude clustering commits for returning user stored extra metadata + .filter(instant -> !isClusteringCommit(metaClient, instant)) + .findFirst().map(instant -> getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty()); } + + /** + * Get extra metadata for specified key from latest commit/deltacommit/replacecommit instant including internal commits + * such as clustering. + */ + public static Option getExtraMetadataFromLatestIncludeClustering(HoodieTableMetaClient metaClient, String extraMetadataKey) { + return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants() + .findFirst().map(instant -> + getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty()); + } + /** * Get extra metadata for specified key from all active commit/deltacommit instants. */ @@ -134,6 +152,7 @@ public static Map> getAllExtraMetadataForKey(HoodieTableM private static Option getMetadataValue(HoodieTableMetaClient metaClient, String extraMetadataKey, HoodieInstant instant) { try { + LOG.info("reading checkpoint info for:" + instant + " key: " + extraMetadataKey); HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( metaClient.getCommitsTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class); @@ -142,4 +161,20 @@ private static Option getMetadataValue(HoodieTableMetaClient metaClient, throw new HoodieIOException("Unable to parse instant metadata " + instant, e); } } + + private static boolean isClusteringCommit(HoodieTableMetaClient metaClient, HoodieInstant instant) { + try { + if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) { + // replacecommit is used for multiple operations: insert_overwrite/cluster etc. + // Check operation type to see if this instant is related to clustering. + HoodieReplaceCommitMetadata replaceMetadata = HoodieReplaceCommitMetadata.fromBytes( + metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class); + return WriteOperationType.CLUSTER.equals(replaceMetadata.getOperationType()); + } + + return false; + } catch (IOException e) { + throw new HoodieIOException("Unable to read instant information: " + instant + " for " + metaClient.getBasePath(), e); + } + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index 18c0d3f20a659..cf7f6d849b7d4 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -73,7 +74,8 @@ public void testGetPartitionsWithReplaceCommits() throws IOException { activeTimeline.createNewInstant(instant1); // create replace metadata only with replaced file Ids (no new files created) activeTimeline.saveAsComplete(instant1, - Option.of(getReplaceCommitMetadata(basePath, ts1, replacePartition, 2, newFilePartition, 0, Collections.emptyMap()))); + Option.of(getReplaceCommitMetadata(basePath, ts1, replacePartition, 2, + newFilePartition, 0, Collections.emptyMap(), WriteOperationType.CLUSTER))); metaClient.reloadActiveTimeline(); List partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("0", 10)); @@ -85,7 +87,8 @@ public void testGetPartitionsWithReplaceCommits() throws IOException { activeTimeline.createNewInstant(instant2); // create replace metadata only with replaced file Ids (no new files created) activeTimeline.saveAsComplete(instant2, - Option.of(getReplaceCommitMetadata(basePath, ts2, replacePartition, 0, newFilePartition, 3, Collections.emptyMap()))); + Option.of(getReplaceCommitMetadata(basePath, ts2, replacePartition, 0, + newFilePartition, 3, Collections.emptyMap(), WriteOperationType.CLUSTER))); metaClient.reloadActiveTimeline(); partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10)); assertEquals(1, partitions.size()); @@ -211,16 +214,42 @@ public void testGetExtraMetadata() throws Exception { metaClient.reloadActiveTimeline(); // verify modified partitions included cleaned data - Option extraLatestValue = TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey); - assertTrue(extraLatestValue.isPresent()); - assertEquals(extraMetadataValue1, extraLatestValue.get()); + verifyExtraMetadataLatestValue(extraMetadataKey, extraMetadataValue1, false); + assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, "unknownKey").isPresent()); + + // verify adding clustering commit doesnt change behavior of getExtraMetadataFromLatest + String ts2 = "2"; + HoodieInstant instant2 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, ts2); + activeTimeline.createNewInstant(instant2); + String newValueForMetadata = "newValue2"; + extraMetadata.put(extraMetadataKey, newValueForMetadata); + activeTimeline.saveAsComplete(instant2, + Option.of(getReplaceCommitMetadata(basePath, ts2, "p2", 0, + "p2", 3, extraMetadata, WriteOperationType.CLUSTER))); + metaClient.reloadActiveTimeline(); + + verifyExtraMetadataLatestValue(extraMetadataKey, extraMetadataValue1, false); + verifyExtraMetadataLatestValue(extraMetadataKey, newValueForMetadata, true); assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, "unknownKey").isPresent()); Map> extraMetadataEntries = TimelineUtils.getAllExtraMetadataForKey(metaClient, extraMetadataKey); - assertEquals(2, extraMetadataEntries.size()); + assertEquals(3, extraMetadataEntries.size()); assertFalse(extraMetadataEntries.get("0").isPresent()); assertTrue(extraMetadataEntries.get("1").isPresent()); assertEquals(extraMetadataValue1, extraMetadataEntries.get("1").get()); + assertTrue(extraMetadataEntries.get("2").isPresent()); + assertEquals(newValueForMetadata, extraMetadataEntries.get("2").get()); + } + + private void verifyExtraMetadataLatestValue(String extraMetadataKey, String expected, boolean includeClustering) { + final Option extraLatestValue; + if (includeClustering) { + extraLatestValue = TimelineUtils.getExtraMetadataFromLatestIncludeClustering(metaClient, extraMetadataKey); + } else { + extraLatestValue = TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey); + } + assertTrue(extraLatestValue.isPresent()); + assertEquals(expected, extraLatestValue.get()); } private byte[] getRestoreMetadata(String basePath, String partition, String commitTs, int count, String actionType) throws IOException { @@ -265,9 +294,11 @@ private byte[] getCommitMetadata(String basePath, String partition, String commi } private byte[] getReplaceCommitMetadata(String basePath, String commitTs, String replacePartition, int replaceCount, - String newFilePartition, int newFileCount, Map extraMetadata) + String newFilePartition, int newFileCount, Map extraMetadata, + WriteOperationType operationType) throws IOException { HoodieReplaceCommitMetadata commit = new HoodieReplaceCommitMetadata(); + commit.setOperationType(operationType); for (int i = 1; i <= newFileCount; i++) { HoodieWriteStat stat = new HoodieWriteStat(); stat.setFileId(i + "");