From 88ff43163d5966ae63f33f5058daa282788f50a9 Mon Sep 17 00:00:00 2001 From: Satish Kotha Date: Tue, 5 Jan 2021 11:21:25 -0800 Subject: [PATCH] [HUDI-1507] Change timeline utils to support reading replacecommit metadata --- .../common/table/timeline/TimelineUtils.java | 14 ++++ .../hudi/common/table/TestTimelineUtils.java | 64 +++++++++++++++++++ 2 files changed, 78 insertions(+) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index 95a2ae618cfc3..0490e4ec5eede 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -21,14 +21,17 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; import java.io.IOException; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -63,6 +66,17 @@ public static List getAffectedPartitions(HoodieTimeline timeline) { return commitMetadata.getPartitionToWriteStats().keySet().stream(); } catch (IOException e) { throw new HoodieIOException("Failed to get partitions written at " + s, e); + } + case HoodieTimeline.REPLACE_COMMIT_ACTION: + try { + HoodieReplaceCommitMetadata commitMetadata = HoodieReplaceCommitMetadata.fromBytes( + timeline.getInstantDetails(s).get(), HoodieReplaceCommitMetadata.class); + Set partitions = new HashSet<>(); + partitions.addAll(commitMetadata.getPartitionToReplaceFileIds().keySet()); + partitions.addAll(commitMetadata.getPartitionToWriteStats().keySet()); + return partitions.stream(); + } catch (IOException e) { + throw new HoodieIOException("Failed to get partitions modified at " + s, e); } case HoodieTimeline.CLEAN_ACTION: try { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index e677f491f14d5..b79ffbbb529d9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -23,8 +23,10 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -58,6 +60,43 @@ public void setUp() throws Exception { initMetaClient(); } + @Test + public void testGetPartitionsWithReplaceCommits() throws IOException { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + assertTrue(activeCommitTimeline.empty()); + + String ts1 = "1"; + String replacePartition = "2021/01/01"; + String newFilePartition = "2021/01/02"; + HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, ts1); + activeTimeline.createNewInstant(instant1); + // create replace metadata only with replaced file Ids (no new files created) + activeTimeline.saveAsComplete(instant1, + Option.of(getReplaceCommitMetadata(basePath, ts1, replacePartition,2, newFilePartition,0, Collections.emptyMap()))); + metaClient.reloadActiveTimeline(); + + List partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("0", 10)); + assertEquals(1, partitions.size()); + assertEquals(replacePartition, partitions.get(0)); + + String ts2 = "2"; + HoodieInstant instant2 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, ts2); + activeTimeline.createNewInstant(instant2); + // create replace metadata only with replaced file Ids (no new files created) + activeTimeline.saveAsComplete(instant2, + Option.of(getReplaceCommitMetadata(basePath, ts2, replacePartition,0, newFilePartition,3, Collections.emptyMap()))); + metaClient.reloadActiveTimeline(); + partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10)); + assertEquals(1, partitions.size()); + assertEquals(newFilePartition, partitions.get(0)); + + partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("0", 10)); + assertEquals(2, partitions.size()); + assertTrue(partitions.contains(replacePartition)); + assertTrue(partitions.contains(newFilePartition)); + } + @Test public void testGetPartitions() throws IOException { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); @@ -224,6 +263,31 @@ private byte[] getCommitMetadata(String basePath, String partition, String commi return commit.toJsonString().getBytes(StandardCharsets.UTF_8); } + private byte[] getReplaceCommitMetadata(String basePath, String commitTs, String replacePartition, int replaceCount, + String newFilePartition, int newFileCount, Map extraMetadata) + throws IOException { + HoodieReplaceCommitMetadata commit = new HoodieReplaceCommitMetadata(); + for (int i = 1; i <= newFileCount; i++) { + HoodieWriteStat stat = new HoodieWriteStat(); + stat.setFileId(i + ""); + stat.setPartitionPath(Paths.get(basePath, newFilePartition).toString()); + stat.setPath(commitTs + "." + i + ".parquet"); + commit.addWriteStat(newFilePartition, stat); + } + Map> partitionToReplaceFileIds = new HashMap<>(); + if (replaceCount > 0) { + partitionToReplaceFileIds.put(replacePartition, new ArrayList<>()); + } + for (int i = 1; i <= replaceCount; i++) { + partitionToReplaceFileIds.get(replacePartition).add(FSUtils.createNewFileIdPfx()); + } + commit.setPartitionToReplaceFileIds(partitionToReplaceFileIds); + for (Map.Entry extraEntries : extraMetadata.entrySet()) { + commit.addMetadata(extraEntries.getKey(), extraEntries.getValue()); + } + return commit.toJsonString().getBytes(StandardCharsets.UTF_8); + } + private Option getCleanMetadata(String partition, String time) throws IOException { Map partitionToFilesCleaned = new HashMap<>(); List filesDeleted = new ArrayList<>();