diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java index eb0e8711a484a..3b0829b1655cb 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java @@ -18,23 +18,26 @@ package org.apache.hudi.table.action.rollback; +import org.apache.hudi.client.HoodieWriteResult; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.Assertions; import org.apache.hudi.testutils.HoodieClientTestBase; - import org.apache.spark.api.java.JavaRDD; import java.io.IOException; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; @@ -96,4 +99,61 @@ protected void twoUpsertCommitDataWithTwoPartitions(List firstPartiti assertEquals(1, secondPartitionCommit2FileSlices.size()); } } + + protected void insertOverwriteCommitDataWithTwoPartitions(List firstPartitionCommit2FileSlices, + List secondPartitionCommit2FileSlices, + HoodieWriteConfig cfg, + boolean commitSecondInsertOverwrite) throws IOException { + //just generate two partitions + dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}); + HoodieTestDataGenerator.writePartitionMetadata(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath); + SparkRDDWriteClient client = getHoodieWriteClient(cfg); + /** + * Write 1 (upsert) + */ + String newCommitTime = "001"; + List records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2); + JavaRDD writeRecords = jsc.parallelize(records, 1); + client.startCommitWithTime(newCommitTime); + JavaRDD statuses = client.upsert(writeRecords, newCommitTime); + Assertions.assertNoWriteErrors(statuses.collect()); + client.commit(newCommitTime, statuses); + + // get fileIds written + HoodieTable table = this.getHoodieTable(metaClient, cfg); + SyncableFileSystemView fsView = getFileSystemViewWithUnCommittedSlices(table.getMetaClient()); + List firstPartitionCommit1FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList()); + assertEquals(1, firstPartitionCommit1FileGroups.size()); + Set partition1Commit1FileIds = firstPartitionCommit1FileGroups.get(0).getAllFileSlices().map(FileSlice::getFileId).collect(Collectors.toSet()); + List secondPartitionCommit1FileGroups = fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList()); + assertEquals(1, secondPartitionCommit1FileGroups.size()); + Set partition2Commit1FileIds = secondPartitionCommit1FileGroups.get(0).getAllFileSlices().map(FileSlice::getFileId).collect(Collectors.toSet()); + + /** + * Write 2 (one insert_overwrite) + */ + String commitActionType = HoodieTimeline.REPLACE_COMMIT_ACTION; + newCommitTime = "002"; + records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2); + writeRecords = jsc.parallelize(records, 1); + client.startCommitWithTime(newCommitTime, commitActionType); + HoodieWriteResult result = client.insertOverwrite(writeRecords, newCommitTime); + statuses = result.getWriteStatuses(); + Assertions.assertNoWriteErrors(statuses.collect()); + if (commitSecondInsertOverwrite) { + client.commit(newCommitTime, statuses, Option.empty(), commitActionType, result.getPartitionToReplaceFileIds()); + } + metaClient.reloadActiveTimeline(); + // get new fileIds written as part of insert_overwrite + fsView = getFileSystemViewWithUnCommittedSlices(metaClient); + List firstPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH) + .filter(fg -> !partition1Commit1FileIds.contains(fg.getFileGroupId().getFileId())).collect(Collectors.toList()); + firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList())); + List secondPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH) + .filter(fg -> !partition2Commit1FileIds.contains(fg.getFileGroupId().getFileId())).collect(Collectors.toList()); + secondPartitionCommit2FileSlices.addAll(secondPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList())); + + assertEquals(1, firstPartitionCommit2FileSlices.size()); + assertEquals(1, secondPartitionCommit2FileSlices.size()); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java index e14dbf9c66142..030cc3e5d2702 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java @@ -28,7 +28,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; - import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -125,6 +124,19 @@ public void testCopyOnWriteRollbackActionExecutorForFileListingAsGenerateFile() assertFalse(testTable.baseFileExists(p2, "002", "id22")); } + // Verify that rollback works with replacecommit + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testCopyOnWriteRollbackWithReplaceCommits(boolean isUsingMarkers) throws IOException { + //1. prepare data and assert data result + List firstPartitionCommit2FileSlices = new ArrayList<>(); + List secondPartitionCommit2FileSlices = new ArrayList<>(); + HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build(); + this.insertOverwriteCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, !isUsingMarkers); + HoodieTable table = this.getHoodieTable(metaClient, cfg); + performRollbackAndValidate(isUsingMarkers, cfg, table, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testCopyOnWriteRollbackActionExecutor(boolean isUsingMarkers) throws IOException { @@ -133,8 +145,14 @@ public void testCopyOnWriteRollbackActionExecutor(boolean isUsingMarkers) throws List secondPartitionCommit2FileSlices = new ArrayList<>(); HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build(); this.twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, !isUsingMarkers); + metaClient.reloadActiveTimeline(); HoodieTable table = this.getHoodieTable(metaClient, cfg); - + performRollbackAndValidate(isUsingMarkers, cfg, table, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices); + } + + private void performRollbackAndValidate(boolean isUsingMarkers, HoodieWriteConfig cfg, HoodieTable table, + List firstPartitionCommit2FileSlices, + List secondPartitionCommit2FileSlices) throws IOException { //2. rollback HoodieInstant commitInstant; if (isUsingMarkers) {