diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java index e88f129747734..0ad209353db6d 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java @@ -24,7 +24,7 @@ import org.apache.hudi.cli.TableHeader; import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; -import org.apache.hudi.cli.testutils.HoodieTestReplaceCommitMetadatGenerator; +import org.apache.hudi.cli.testutils.HoodieTestReplaceCommitMetadataGenerator; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -122,7 +122,7 @@ private LinkedHashMap generateMixedData() throws Excep for (Map.Entry entry : replaceCommitData.entrySet()) { String key = entry.getKey().getTimestamp(); Integer[] value = entry.getValue(); - HoodieTestReplaceCommitMetadatGenerator.createReplaceCommitFileWithMetadata(tablePath, key, + HoodieTestReplaceCommitMetadataGenerator.createReplaceCommitFileWithMetadata(tablePath, key, Option.of(value[0]), Option.of(value[1]), metaClient); } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestReplaceCommitMetadatGenerator.java b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestReplaceCommitMetadataGenerator.java similarity index 96% rename from hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestReplaceCommitMetadatGenerator.java rename to hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestReplaceCommitMetadataGenerator.java index f7244f9fe5127..f7f556f34e947 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestReplaceCommitMetadatGenerator.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestReplaceCommitMetadataGenerator.java @@ -35,7 +35,7 @@ import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName; import static org.apache.hudi.common.util.CollectionUtils.createImmutableList; -public class HoodieTestReplaceCommitMetadatGenerator extends HoodieTestCommitMetadataGenerator { +public class HoodieTestReplaceCommitMetadataGenerator extends HoodieTestCommitMetadataGenerator { public static void createReplaceCommitFileWithMetadata(String basePath, String commitTime, Option writes, Option updates, HoodieTableMetaClient metaclient) throws Exception { @@ -43,7 +43,7 @@ public static void createReplaceCommitFileWithMetadata(String basePath, String c UUID.randomUUID().toString(), writes, updates); HoodieRequestedReplaceMetadata requestedReplaceMetadata = getHoodieRequestedReplaceMetadata(); - HoodieTestTable.of(metaclient).addReplaceCommit(commitTime, requestedReplaceMetadata, replaceMetadata); + HoodieTestTable.of(metaclient).addReplaceCommit(commitTime, Option.ofNullable(requestedReplaceMetadata), Option.empty(), replaceMetadata); } private static HoodieRequestedReplaceMetadata getHoodieRequestedReplaceMetadata() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java index ee03bd7cbf841..5144f2b8350d4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java @@ -68,6 +68,11 @@ public static org.apache.hudi.avro.model.HoodieReplaceCommitMetadata convertRepl public static boolean deleteReplacedFileGroups(HoodieEngineContext context, HoodieTableMetaClient metaClient, TableFileSystemView fileSystemView, HoodieInstant instant, List replacedPartitions) { + // There is no file id to be replaced in the very first replace commit file for insert overwrite operation + if (replacedPartitions.isEmpty()) { + LOG.warn("Found no partition files to replace"); + return true; + } context.setJobStatus(ReplaceArchivalHelper.class.getSimpleName(), "Delete replaced file groups"); List f = context.map(replacedPartitions, partition -> { Stream fileSlices = fileSystemView.getReplacedFileGroupsBeforeOrOn(instant.getTimestamp(), partition) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java index 7061dfbf70acc..bfc09df078f5e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; + import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; @@ -37,8 +38,8 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.CleanerUtils; -import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; /** * Helper class to convert between different action related payloads and {@link HoodieArchivedMetaEntry}. @@ -72,10 +73,21 @@ public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant hoodieInst HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata .fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieReplaceCommitMetadata.class); archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata)); + } else if (hoodieInstant.isInflight()) { + // inflight replacecommit files have the same meta data body as HoodieCommitMetadata + // so we could re-use it without further creating an inflight extension. + // Or inflight replacecommit files are empty under clustering circumstance + Option inflightCommitMetadata = getInflightReplaceMetadata(metaClient, hoodieInstant); + if (inflightCommitMetadata.isPresent()) { + archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get())); + } } else { - HoodieRequestedReplaceMetadata requestedReplaceMetadata = - ClusteringUtils.getRequestedReplaceMetadata(metaClient, hoodieInstant).get(); - archivedMetaWrapper.setHoodieRequestedReplaceMetadata(requestedReplaceMetadata); + // we may have cases with empty HoodieRequestedReplaceMetadata e.g. insert_overwrite_table or insert_overwrite + // without clustering. However, we should revisit the requested commit file standardization + Option requestedReplaceMetadata = getRequestedReplaceMetadata(metaClient, hoodieInstant); + if (requestedReplaceMetadata.isPresent()) { + archivedMetaWrapper.setHoodieRequestedReplaceMetadata(requestedReplaceMetadata.get()); + } } archivedMetaWrapper.setActionType(ActionType.replacecommit.name()); break; @@ -107,14 +119,25 @@ public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant hoodieInst return archivedMetaWrapper; } - public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant hoodieInstant, - HoodieCommitMetadata hoodieCommitMetadata) { - HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry(); - archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp()); - archivedMetaWrapper.setActionState(hoodieInstant.getState().name()); - archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(hoodieCommitMetadata)); - archivedMetaWrapper.setActionType(ActionType.commit.name()); - return archivedMetaWrapper; + public static Option getInflightReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant instant) throws IOException { + Option inflightContent = metaClient.getActiveTimeline().getInstantDetails(instant); + if (!inflightContent.isPresent() || inflightContent.get().length == 0) { + // inflight files can be empty in some certain cases, e.g. when users opt in clustering + return Option.empty(); + } + return Option.of(HoodieCommitMetadata.fromBytes(inflightContent.get(), HoodieCommitMetadata.class)); + } + + public static Option getRequestedReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant instant) throws IOException { + Option requestedContent = metaClient.getActiveTimeline().getInstantDetails(instant); + if (!requestedContent.isPresent() || requestedContent.get().length == 0) { + // requested commit files can be empty in some certain cases, e.g. insert_overwrite or insert_overwrite_table. + // However, it appears requested files are supposed to contain meta data and we should revisit the standardization + // of requested commit files + // TODO revisit requested commit file standardization https://issues.apache.org/jira/browse/HUDI-1739 + return Option.empty(); + } + return Option.of(TimelineMetadataUtils.deserializeRequestedReplaceMetadata(requestedContent.get())); } public static org.apache.hudi.avro.model.HoodieCommitMetadata convertCommitMetadata( diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index 7993f0282ea75..e2105a70d92b6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -296,7 +296,6 @@ private boolean deleteAllInstantsOlderorEqualsInAuxMetaFolder(HoodieInstant thre public void archive(HoodieEngineContext context, List instants) throws HoodieCommitException { try { - HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants(); Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema(); LOG.info("Wrapper schema " + wrapperSchema.toString()); List records = new ArrayList<>(); @@ -308,7 +307,7 @@ public void archive(HoodieEngineContext context, List instants) t } try { deleteAnyLeftOverMarkerFiles(context, hoodieInstant); - records.add(convertToAvroRecord(commitTimeline, hoodieInstant)); + records.add(convertToAvroRecord(hoodieInstant)); if (records.size() >= this.config.getCommitArchivalBatchSize()) { writeToFile(wrapperSchema, records); } @@ -365,8 +364,8 @@ private void writeToFile(Schema wrapperSchema, List records) thro } } - private IndexedRecord convertToAvroRecord(HoodieTimeline commitTimeline, HoodieInstant hoodieInstant) - throws IOException { + private IndexedRecord convertToAvroRecord(HoodieInstant hoodieInstant) + throws IOException { return MetadataConversionUtils.createMetaWrapper(hoodieInstant, metaClient); } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java index 9483d61fb57ce..abe2a945628a4 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java @@ -383,7 +383,7 @@ private void createReplaceRequested(String instantTime) throws Exception { requestedReplaceMetadata.setClusteringPlan(clusteringPlan); requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION); HoodieTestTable.of(metaClient) - .addRequestedReplace(instantTime, requestedReplaceMetadata) + .addRequestedReplace(instantTime, Option.of(requestedReplaceMetadata)) .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); } @@ -413,7 +413,7 @@ private void createReplace(String instantTime, WriteOperationType writeOperation requestedReplaceMetadata.setClusteringPlan(clusteringPlan); requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION); HoodieTestTable.of(metaClient) - .addReplaceCommit(instantTime, requestedReplaceMetadata, replaceMetadata) + .addReplaceCommit(instantTime, Option.of(requestedReplaceMetadata), Option.empty(), replaceMetadata) .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java index d8a51708f6df5..bdc8eb62608f7 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java @@ -20,6 +20,7 @@ import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import java.io.IOException; import java.util.ArrayList; @@ -80,13 +81,43 @@ public void testCompletedClean() throws Exception { @Test public void testCompletedReplace() throws Exception { String newCommitTime = HoodieTestTable.makeNewCommitTime(); - createReplace(newCommitTime, WriteOperationType.INSERT_OVERWRITE); + createReplace(newCommitTime, WriteOperationType.INSERT_OVERWRITE, true); HoodieArchivedMetaEntry metaEntry = MetadataConversionUtils.createMetaWrapper( new HoodieInstant(State.COMPLETED, HoodieTimeline.REPLACE_COMMIT_ACTION, newCommitTime), metaClient); assertEquals(metaEntry.getActionState(), State.COMPLETED.toString()); assertEquals(metaEntry.getHoodieReplaceCommitMetadata().getOperationType(), WriteOperationType.INSERT_OVERWRITE.toString()); } + @Test + public void testEmptyRequestedReplace() throws Exception { + String newCommitTime = HoodieTestTable.makeNewCommitTime(); + createReplace(newCommitTime, WriteOperationType.INSERT_OVERWRITE_TABLE, false); + HoodieArchivedMetaEntry metaEntry = MetadataConversionUtils.createMetaWrapper( + new HoodieInstant(State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, newCommitTime), metaClient); + assertEquals(metaEntry.getActionState(), State.REQUESTED.toString()); + assertNull(metaEntry.getHoodieRequestedReplaceMetadata()); + } + + @Test + public void testEmptyInflightReplace() throws Exception { + String newCommitTime = HoodieTestTable.makeNewCommitTime(); + createReplace(newCommitTime, WriteOperationType.INSERT_OVERWRITE_TABLE, true); + HoodieArchivedMetaEntry metaEntry = MetadataConversionUtils.createMetaWrapper( + new HoodieInstant(State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, newCommitTime), metaClient); + assertEquals(metaEntry.getActionState(), State.INFLIGHT.toString()); + assertNull(metaEntry.getHoodieInflightReplaceMetadata()); + } + + @Test + public void testNonEmptyInflightReplace() throws Exception { + String newCommitTime = HoodieTestTable.makeNewCommitTime(); + createReplace(newCommitTime, WriteOperationType.INSERT_OVERWRITE_TABLE, false); + HoodieArchivedMetaEntry metaEntry = MetadataConversionUtils.createMetaWrapper( + new HoodieInstant(State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, newCommitTime), metaClient); + assertEquals(metaEntry.getActionState(), State.INFLIGHT.toString()); + assertEquals(metaEntry.getHoodieInflightReplaceMetadata().getOperationType(), WriteOperationType.INSERT_OVERWRITE_TABLE.name()); + } + @Test public void testCompletedCommitOrDeltaCommit() throws Exception { String newCommitTime = HoodieTestTable.makeNewCommitTime(); @@ -169,7 +200,8 @@ private void createCommitMetadata(String instantTime) throws Exception { .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); } - private void createReplace(String instantTime, WriteOperationType writeOperationType) throws Exception { + private void createReplace(String instantTime, WriteOperationType writeOperationType, Boolean isClustering) + throws Exception { String fileId1 = "file-1"; String fileId2 = "file-2"; @@ -182,18 +214,29 @@ private void createReplace(String instantTime, WriteOperationType writeOperation writeStat.setFileId("file-1"); replaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); replaceMetadata.setOperationType(writeOperationType); - // create replace instant to mark fileId1 as deleted - HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata(); - requestedReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE.name()); - HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan(); - HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup(); - HoodieSliceInfo sliceInfo = new HoodieSliceInfo(); - clusteringGroup.setSlices(Arrays.asList(sliceInfo)); - clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup)); - requestedReplaceMetadata.setClusteringPlan(clusteringPlan); - requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION); + // some cases requestedReplaceMetadata will be null + // e.g. insert_overwrite_table or insert_overwrite without clustering + HoodieRequestedReplaceMetadata requestedReplaceMetadata = null; + HoodieCommitMetadata inflightReplaceMetadata = null; + if (isClustering) { + requestedReplaceMetadata = new HoodieRequestedReplaceMetadata(); + requestedReplaceMetadata.setOperationType(writeOperationType.name()); + HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan(); + HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup(); + HoodieSliceInfo sliceInfo = new HoodieSliceInfo(); + clusteringGroup.setSlices(Arrays.asList(sliceInfo)); + clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup)); + requestedReplaceMetadata.setClusteringPlan(clusteringPlan); + requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION); + } else { + // inflightReplaceMetadata will be null in clustering but not null + // in insert_overwrite or insert_overwrite_table + inflightReplaceMetadata = new HoodieCommitMetadata(); + inflightReplaceMetadata.setOperationType(writeOperationType); + inflightReplaceMetadata.setCompacted(false); + } HoodieTestTable.of(metaClient) - .addReplaceCommit(instantTime, requestedReplaceMetadata, replaceMetadata) + .addReplaceCommit(instantTime, Option.ofNullable(requestedReplaceMetadata), Option.ofNullable(inflightReplaceMetadata), replaceMetadata) .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java index b42f650c3c1b5..ea80b089e57bf 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java @@ -241,6 +241,10 @@ public void testArchiveTableWithReplacedFiles() throws Exception { .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) .build(); + // when using insert_overwrite or insert_overwrite_table + // first commit may without replaceFileIds + createReplaceMetadataWithoutReplaceFileId("000"); + int numCommits = 4; int commitInstant = 100; for (int i = 0; i < numCommits; i++) { @@ -251,7 +255,7 @@ public void testArchiveTableWithReplacedFiles() throws Exception { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - assertEquals(4, timeline.countInstants(), "Loaded 4 commits and the count should match"); + assertEquals(5, timeline.countInstants(), "Loaded 5 commits and the count should match"); HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); boolean result = archiveLog.archiveIfRequired(context); assertTrue(result); @@ -513,13 +517,7 @@ public void testConvertCommitMetadata() { HoodieCommitMetadata hoodieCommitMetadata = new HoodieCommitMetadata(); hoodieCommitMetadata.setOperationType(WriteOperationType.INSERT); - HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) - .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-commitMetadata-converter") - .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()) - .build(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); org.apache.hudi.avro.model.HoodieCommitMetadata expectedCommitMetadata = MetadataConversionUtils .convertCommitMetadata(hoodieCommitMetadata); @@ -681,6 +679,22 @@ public void testArchiveInflightClean() throws IOException { assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, notArchivedInstant2, notArchivedInstant3), ""); } + private void createReplaceMetadataWithoutReplaceFileId(String instantTime) throws Exception { + + // create replace instant without a previous replace commit + HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder() + .setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE.toString()) + .setVersion(1) + .setExtraMetadata(Collections.emptyMap()) + .build(); + HoodieReplaceCommitMetadata completeReplaceMetadata = new HoodieReplaceCommitMetadata(); + HoodieCommitMetadata inflightReplaceMetadata = new HoodieCommitMetadata(); + completeReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE); + inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE); + HoodieTestTable.of(metaClient) + .addReplaceCommit(instantTime, Option.of(requestedReplaceMetadata), Option.of(inflightReplaceMetadata), completeReplaceMetadata); + } + private void createReplaceMetadata(String instantTime) throws Exception { String fileId1 = "file-" + instantTime + "-1"; String fileId2 = "file-" + instantTime + "-2"; @@ -691,11 +705,13 @@ private void createReplaceMetadata(String instantTime) throws Exception { .setVersion(1) .setExtraMetadata(Collections.emptyMap()) .build(); - HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); - replaceMetadata.addReplaceFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1); - replaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE); + HoodieReplaceCommitMetadata completeReplaceMetadata = new HoodieReplaceCommitMetadata(); + HoodieCommitMetadata inflightReplaceMetadata = new HoodieCommitMetadata(); + completeReplaceMetadata.addReplaceFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1); + completeReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE); + inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE); HoodieTestTable.of(metaClient) - .addReplaceCommit(instantTime, requestedReplaceMetadata, replaceMetadata) + .addReplaceCommit(instantTime, Option.of(requestedReplaceMetadata), Option.of(inflightReplaceMetadata), completeReplaceMetadata) .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 83bb684294024..14abb4600ac88 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -843,10 +843,11 @@ public void testCleanWithReplaceCommits() throws Exception { assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); // make next replacecommit, with 1 clustering operation. logically delete p0. No change to p1 + // notice that clustering generates empty inflight commit files Map partitionAndFileId002 = testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0); String file2P0C1 = partitionAndFileId002.get(p0); Pair replaceMetadata = generateReplaceCommitMetadata(p0, file1P0C0, file2P0C1); - testTable.addReplaceCommit("00000000000002", replaceMetadata.getKey(), replaceMetadata.getValue()); + testTable.addReplaceCommit("00000000000002", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); // run cleaner List hoodieCleanStatsTwo = runCleaner(config); @@ -856,10 +857,11 @@ public void testCleanWithReplaceCommits() throws Exception { assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); // make next replacecommit, with 1 clustering operation. Replace data in p1. No change to p0 + // notice that clustering generates empty inflight commit files Map partitionAndFileId003 = testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1); String file3P1C2 = partitionAndFileId003.get(p1); replaceMetadata = generateReplaceCommitMetadata(p1, file1P1C0, file3P1C2); - testTable.addReplaceCommit("00000000000003", replaceMetadata.getKey(), replaceMetadata.getValue()); + testTable.addReplaceCommit("00000000000003", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); // run cleaner List hoodieCleanStatsThree = runCleaner(config); @@ -870,10 +872,11 @@ public void testCleanWithReplaceCommits() throws Exception { assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); // make next replacecommit, with 1 clustering operation. Replace data in p0 again + // notice that clustering generates empty inflight commit files Map partitionAndFileId004 = testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0); String file4P0C3 = partitionAndFileId004.get(p0); replaceMetadata = generateReplaceCommitMetadata(p0, file2P0C1, file4P0C3); - testTable.addReplaceCommit("00000000000004", replaceMetadata.getKey(), replaceMetadata.getValue()); + testTable.addReplaceCommit("00000000000004", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); // run cleaner List hoodieCleanStatsFour = runCleaner(config); @@ -885,10 +888,11 @@ public void testCleanWithReplaceCommits() throws Exception { assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); // make next replacecommit, with 1 clustering operation. Replace all data in p1. no new files created + // notice that clustering generates empty inflight commit files Map partitionAndFileId005 = testTable.forReplaceCommit("00000000000005").getFileIdsWithBaseFilesInPartitions(p1); String file4P1C4 = partitionAndFileId005.get(p1); replaceMetadata = generateReplaceCommitMetadata(p0, file3P1C2, file4P1C4); - testTable.addReplaceCommit("00000000000005", replaceMetadata.getKey(), replaceMetadata.getValue()); + testTable.addReplaceCommit("00000000000005", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); List hoodieCleanStatsFive = runCleaner(config, 2); assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3)); diff --git a/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc b/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc index 60be522495c72..c052147f718ea 100644 --- a/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc +++ b/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc @@ -112,6 +112,14 @@ "HoodieRequestedReplaceMetadata" ], "default": null + }, + { + "name":"HoodieInflightReplaceMetadata", + "type":[ + "null", + "HoodieCommitMetadata" + ], + "default": null } ] } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 9409566b4e65e..7f2297f2863cc 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; +import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hadoop.fs.FileSystem; @@ -156,12 +157,20 @@ public static void createReplaceCommit(String basePath, String instantTime, Hood createMetaFile(basePath, instantTime, HoodieTimeline.REPLACE_COMMIT_EXTENSION, metadata.toJsonString().getBytes(StandardCharsets.UTF_8)); } - public static void createRequestedReplaceCommit(String basePath, String instantTime, HoodieRequestedReplaceMetadata requestedReplaceMetadata) throws IOException { - createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_REPLACE_COMMIT_EXTENSION, serializeRequestedReplaceMetadata(requestedReplaceMetadata).get()); + public static void createRequestedReplaceCommit(String basePath, String instantTime, Option requestedReplaceMetadata) throws IOException { + if (requestedReplaceMetadata.isPresent()) { + createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_REPLACE_COMMIT_EXTENSION, serializeRequestedReplaceMetadata(requestedReplaceMetadata.get()).get()); + } else { + createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_REPLACE_COMMIT_EXTENSION); + } } - public static void createInflightReplaceCommit(String basePath, String instantTime) throws IOException { - createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_REPLACE_COMMIT_EXTENSION); + public static void createInflightReplaceCommit(String basePath, String instantTime, Option inflightReplaceMetadata) throws IOException { + if (inflightReplaceMetadata.isPresent()) { + createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_REPLACE_COMMIT_EXTENSION, inflightReplaceMetadata.get().toJsonString().getBytes(StandardCharsets.UTF_8)); + } else { + createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_REPLACE_COMMIT_EXTENSION); + } } public static void createCleanFile(String basePath, String instantTime, HoodieCleanMetadata metadata) throws IOException { @@ -197,10 +206,6 @@ public static void createRequestedCompaction(String basePath, String instantTime createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMPACTION_EXTENSION); } - public static void createInflightCompaction(String basePath, String instantTime) throws IOException { - createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_COMPACTION_EXTENSION); - } - public static void createPartitionMetaFile(String basePath, String partitionPath) throws IOException { Path parentPath = Paths.get(basePath, partitionPath); Files.createDirectories(parentPath); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 98b0f90b7fbee..bfdff9dfdb72d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -64,7 +64,6 @@ import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCleanFile; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit; -import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCompaction; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightReplaceCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightRollbackFile; @@ -127,13 +126,6 @@ public HoodieTestTable addRequestedCommit(String instantTime) throws Exception { return this; } - public HoodieTestTable addRequestedDeltaCommit(String instantTime) throws Exception { - createRequestedDeltaCommit(basePath, instantTime); - currentInstantTime = instantTime; - metaClient = HoodieTableMetaClient.reload(metaClient); - return this; - } - public HoodieTestTable addInflightCommit(String instantTime) throws Exception { createRequestedCommit(basePath, instantTime); createInflightCommit(basePath, instantTime); @@ -142,14 +134,6 @@ public HoodieTestTable addInflightCommit(String instantTime) throws Exception { return this; } - public HoodieTestTable addInflightDeltaCommit(String instantTime) throws Exception { - createRequestedDeltaCommit(basePath, instantTime); - createInflightDeltaCommit(basePath, instantTime); - currentInstantTime = instantTime; - metaClient = HoodieTableMetaClient.reload(metaClient); - return this; - } - public HoodieTestTable addCommit(String instantTime) throws Exception { createRequestedCommit(basePath, instantTime); createInflightCommit(basePath, instantTime); @@ -177,16 +161,20 @@ public HoodieTestTable addDeltaCommit(String instantTime) throws Exception { return this; } - public HoodieTestTable addReplaceCommit(String instantTime, HoodieRequestedReplaceMetadata requestedReplaceMetadata, HoodieReplaceCommitMetadata metadata) throws Exception { + public HoodieTestTable addReplaceCommit( + String instantTime, + Option requestedReplaceMetadata, + Option inflightReplaceMetadata, + HoodieReplaceCommitMetadata completeReplaceMetadata) throws Exception { createRequestedReplaceCommit(basePath, instantTime, requestedReplaceMetadata); - createInflightReplaceCommit(basePath, instantTime); - createReplaceCommit(basePath, instantTime, metadata); + createInflightReplaceCommit(basePath, instantTime, inflightReplaceMetadata); + createReplaceCommit(basePath, instantTime, completeReplaceMetadata); currentInstantTime = instantTime; metaClient = HoodieTableMetaClient.reload(metaClient); return this; } - public HoodieTestTable addRequestedReplace(String instantTime, HoodieRequestedReplaceMetadata requestedReplaceMetadata) throws Exception { + public HoodieTestTable addRequestedReplace(String instantTime, Option requestedReplaceMetadata) throws Exception { createRequestedReplaceCommit(basePath, instantTime, requestedReplaceMetadata); currentInstantTime = instantTime; metaClient = HoodieTableMetaClient.reload(metaClient); @@ -246,14 +234,6 @@ public HoodieTestTable addRequestedCompaction(String instantTime, FileSlice... f return addRequestedCompaction(instantTime, plan); } - public HoodieTestTable addCompaction(String instantTime) throws IOException { - createRequestedCompaction(basePath, instantTime); - createInflightCompaction(basePath, instantTime); - currentInstantTime = instantTime; - metaClient = HoodieTableMetaClient.reload(metaClient); - return this; - } - public HoodieTestTable forCommit(String instantTime) { currentInstantTime = instantTime; return this; @@ -269,11 +249,6 @@ public HoodieTestTable forReplaceCommit(String instantTime) { return this; } - public HoodieTestTable forCompaction(String instantTime) { - currentInstantTime = instantTime; - return this; - } - public HoodieTestTable withPartitionMetaFiles(String... partitionPaths) throws IOException { for (String partitionPath : partitionPaths) { FileCreateUtils.createPartitionMetaFile(basePath, partitionPath); @@ -281,10 +256,6 @@ public HoodieTestTable withPartitionMetaFiles(String... partitionPaths) throws I return this; } - public HoodieTestTable withMarkerFile(String partitionPath, IOType ioType) throws IOException { - return withMarkerFile(partitionPath, UUID.randomUUID().toString(), ioType); - } - public HoodieTestTable withMarkerFile(String partitionPath, String fileId, IOType ioType) throws IOException { createMarkerFile(basePath, partitionPath, currentInstantTime, fileId, ioType); return this; @@ -356,10 +327,6 @@ public HoodieTestTable withLogFile(String partitionPath, String fileId, int... v return this; } - public boolean inflightCommitsExist(String... instantTime) { - return Arrays.stream(instantTime).allMatch(this::inflightCommitExists); - } - public boolean inflightCommitExists(String instantTime) { try { return fs.exists(getInflightCommitFilePath(instantTime)); @@ -368,10 +335,6 @@ public boolean inflightCommitExists(String instantTime) { } } - public boolean commitsExist(String... instantTime) { - return Arrays.stream(instantTime).allMatch(this::commitExists); - } - public boolean commitExists(String instantTime) { try { return fs.exists(getCommitFilePath(instantTime)); @@ -388,10 +351,6 @@ public boolean baseFilesExist(Map partitionAndFileId, String ins }); } - public boolean baseFilesExist(String partition, String instantTime, String... fileIds) { - return Arrays.stream(fileIds).allMatch(f -> baseFileExists(partition, instantTime, f)); - } - public boolean baseFileExists(String partition, String instantTime, String fileId) { try { return fs.exists(new Path(Paths.get(basePath, partition, baseFileName(instantTime, fileId)).toString()));