diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index cae5dbbad0107..8efd3a2268921 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -400,9 +400,14 @@ private IndexedRecord convertToAvroRecord(HoodieTimeline commitTimeline, HoodieI break; } case HoodieTimeline.REPLACE_COMMIT_ACTION: { - HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata - .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieReplaceCommitMetadata.class); - archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata)); + if (hoodieInstant.isRequested()) { + archivedMetaWrapper.setHoodieRequestedReplaceMetadata( + TimelineMetadataUtils.deserializeRequestedReplaceMetadata(commitTimeline.getInstantDetails(hoodieInstant).get())); + } else if (hoodieInstant.isCompleted()) { + HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata + .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieReplaceCommitMetadata.class); + archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata)); + } archivedMetaWrapper.setActionType(ActionType.replacecommit.name()); break; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java index 60f605c69b6bf..042013d3af3e5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java @@ -21,6 +21,7 @@ import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; @@ -495,11 +496,16 @@ private void createReplaceMetadata(String instantTime) throws Exception { String fileId2 = "file-" + instantTime + "-2"; // create replace instant to mark fileId1 as deleted + HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder() + .setOperationType(WriteOperationType.INSERT_OVERWRITE.toString()) + .setVersion(1) + .setExtraMetadata(Collections.emptyMap()) + .build(); HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); replaceMetadata.addReplaceFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1); replaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE); HoodieTestTable.of(metaClient) - .addReplaceCommit(instantTime, replaceMetadata) + .addReplaceCommit(instantTime, requestedReplaceMetadata, replaceMetadata) .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 1119f2665b3aa..fd578bd8b70bc 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -24,9 +24,14 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieClusteringStrategy; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieFileStatus; +import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.avro.model.HoodieSliceInfo; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.HoodieCleanStat; @@ -845,7 +850,8 @@ public void testCleanWithReplaceCommits() throws Exception { // make next replacecommit, with 1 clustering operation. logically delete p0. No change to p1 Map partitionAndFileId002 = testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0); String file2P0C1 = partitionAndFileId002.get(p0); - testTable.addReplaceCommit("00000000000002", generateReplaceCommitMetadata(p0, file1P0C0, file2P0C1)); + Pair replaceMetadata = generateReplaceCommitMetadata(p0, file1P0C0, file2P0C1); + testTable.addReplaceCommit("00000000000002", replaceMetadata.getKey(), replaceMetadata.getValue()); // run cleaner List hoodieCleanStatsTwo = runCleaner(config); @@ -857,7 +863,8 @@ public void testCleanWithReplaceCommits() throws Exception { // make next replacecommit, with 1 clustering operation. Replace data in p1. No change to p0 Map partitionAndFileId003 = testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1); String file3P1C2 = partitionAndFileId003.get(p1); - testTable.addReplaceCommit("00000000000003", generateReplaceCommitMetadata(p1, file1P1C0, file3P1C2)); + replaceMetadata = generateReplaceCommitMetadata(p1, file1P1C0, file3P1C2); + testTable.addReplaceCommit("00000000000003", replaceMetadata.getKey(), replaceMetadata.getValue()); // run cleaner List hoodieCleanStatsThree = runCleaner(config); @@ -870,7 +877,8 @@ public void testCleanWithReplaceCommits() throws Exception { // make next replacecommit, with 1 clustering operation. Replace data in p0 again Map partitionAndFileId004 = testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0); String file4P0C3 = partitionAndFileId004.get(p0); - testTable.addReplaceCommit("00000000000004", generateReplaceCommitMetadata(p0, file2P0C1, file4P0C3)); + replaceMetadata = generateReplaceCommitMetadata(p0, file2P0C1, file4P0C3); + testTable.addReplaceCommit("00000000000004", replaceMetadata.getKey(), replaceMetadata.getValue()); // run cleaner List hoodieCleanStatsFour = runCleaner(config); @@ -884,7 +892,8 @@ public void testCleanWithReplaceCommits() throws Exception { // make next replacecommit, with 1 clustering operation. Replace all data in p1. no new files created Map partitionAndFileId005 = testTable.forReplaceCommit("00000000000005").getFileIdsWithBaseFilesInPartitions(p1); String file4P1C4 = partitionAndFileId005.get(p1); - testTable.addReplaceCommit("00000000000005", generateReplaceCommitMetadata(p1, file3P1C2, file4P1C4)); + replaceMetadata = generateReplaceCommitMetadata(p0, file3P1C2, file4P1C4); + testTable.addReplaceCommit("00000000000005", replaceMetadata.getKey(), replaceMetadata.getValue()); List hoodieCleanStatsFive = runCleaner(config, 2); assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3)); @@ -894,7 +903,23 @@ public void testCleanWithReplaceCommits() throws Exception { assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); } - private HoodieReplaceCommitMetadata generateReplaceCommitMetadata(String partition, String replacedFileId, String newFileId) { + private Pair generateReplaceCommitMetadata(String partition, + String replacedFileId, + String newFileId) { + HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata(); + requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.toString()); + requestedReplaceMetadata.setVersion(1); + HoodieSliceInfo sliceInfo = HoodieSliceInfo.newBuilder().setFileId(replacedFileId).build(); + List clusteringGroups = new ArrayList<>(); + clusteringGroups.add(HoodieClusteringGroup.newBuilder() + .setVersion(1).setNumOutputFileGroups(1).setMetrics(Collections.emptyMap()) + .setSlices(Collections.singletonList(sliceInfo)).build()); + requestedReplaceMetadata.setExtraMetadata(Collections.emptyMap()); + requestedReplaceMetadata.setClusteringPlan(HoodieClusteringPlan.newBuilder() + .setVersion(1).setExtraMetadata(Collections.emptyMap()) + .setStrategy(HoodieClusteringStrategy.newBuilder().setStrategyClassName("").setVersion(1).build()) + .setInputGroups(clusteringGroups).build()); + HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); replaceMetadata.addReplaceFileId(partition, replacedFileId); replaceMetadata.setOperationType(WriteOperationType.CLUSTER); @@ -905,7 +930,7 @@ private HoodieReplaceCommitMetadata generateReplaceCommitMetadata(String partiti writeStat.setFileId(newFileId); replaceMetadata.addWriteStat(partition, writeStat); } - return replaceMetadata; + return Pair.of(requestedReplaceMetadata, replaceMetadata); } @Test diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml index 41588ca8996c6..af6963187ae22 100644 --- a/hudi-common/pom.xml +++ b/hudi-common/pom.xml @@ -77,7 +77,6 @@ ${basedir}/src/main/avro/HoodieRollbackMetadata.avsc ${basedir}/src/main/avro/HoodieRestoreMetadata.avsc ${basedir}/src/main/avro/HoodieReplaceCommitMetadata.avsc - ${basedir}/src/main/avro/HoodieArchivedMetaEntry.avsc ${basedir}/src/main/avro/HoodiePath.avsc ${basedir}/src/main/avro/HoodieFSPermission.avsc ${basedir}/src/main/avro/HoodieFileStatus.avsc @@ -90,6 +89,7 @@ ${basedir}/src/main/avro/HoodieClusteringPlan.avsc ${basedir}/src/main/avro/HoodieRequestedReplaceMetadata.avsc ${basedir}/src/main/avro/HoodieMetadata.avsc + ${basedir}/src/main/avro/HoodieArchivedMetaEntry.avsc diff --git a/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc b/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc index c68ef879e7551..60be522495c72 100644 --- a/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc +++ b/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc @@ -104,6 +104,14 @@ "HoodieReplaceCommitMetadata" ], "default": null + }, + { + "name":"hoodieRequestedReplaceMetadata", + "type":[ + "null", + "HoodieRequestedReplaceMetadata" + ], + "default": null } ] } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 4beba35a58138..5f03ef015faf0 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -21,6 +21,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodiePartitionMetadata; @@ -28,6 +29,7 @@ import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.exception.HoodieException; @@ -147,8 +149,9 @@ public static void createReplaceCommit(String basePath, String instantTime, Hood createMetaFile(basePath, instantTime, HoodieTimeline.REPLACE_COMMIT_EXTENSION, metadata.toJsonString().getBytes(StandardCharsets.UTF_8)); } - public static void createRequestedReplaceCommit(String basePath, String instantTime) throws IOException { - createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_REPLACE_COMMIT_EXTENSION); + public static void createRequestedReplaceCommit(String basePath, String instantTime, HoodieRequestedReplaceMetadata requestedMetadata) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_REPLACE_COMMIT_EXTENSION, + TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedMetadata).get()); } public static void createInflightReplaceCommit(String basePath, String instantTime) throws IOException { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 858e113734b58..46ef14f2077a0 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; @@ -163,8 +164,8 @@ public HoodieTestTable addDeltaCommit(String instantTime) throws Exception { return this; } - public HoodieTestTable addReplaceCommit(String instantTime, HoodieReplaceCommitMetadata metadata) throws Exception { - createRequestedReplaceCommit(basePath, instantTime); + public HoodieTestTable addReplaceCommit(String instantTime, HoodieRequestedReplaceMetadata requestedReplaceMetadata, HoodieReplaceCommitMetadata metadata) throws Exception { + createRequestedReplaceCommit(basePath, instantTime, requestedReplaceMetadata); createInflightReplaceCommit(basePath, instantTime); createReplaceCommit(basePath, instantTime, metadata); currentInstantTime = instantTime;