From bc0745733c40850504e6f5e1022456fe2cab6afc Mon Sep 17 00:00:00 2001 From: sivabalan Date: Fri, 29 Jul 2022 15:15:47 +0530 Subject: [PATCH 1/2] Fixing minor bug in listing based rollback request generation --- .../ListingBasedRollbackStrategy.java | 6 +- ...TestCopyOnWriteRollbackActionExecutor.java | 74 +++++++++++++++++++ 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java index aa9e0b6583a24..10b02d7fff200 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java @@ -242,11 +242,11 @@ private FileStatus[] fetchFilesFromCommitMetadata(HoodieInstant instantToRollbac return fs.listStatus(Arrays.stream(filePaths).filter(entry -> { try { return fs.exists(entry); - } catch (IOException e) { + } catch (Exception e) { LOG.error("Exists check failed for " + entry.toString(), e); } - // if IOException is thrown, do not ignore. lets try to add the file of interest to be deleted. we can't miss any files to be rolled back. - return false; + // if any Exception is thrown, do not ignore. let's try to add the file of interest to be deleted. we can't miss any files to be rolled back. + return true; }).toArray(Path[]::new), pathFilter); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java index e5dd5b087aa23..3837d5a6fe1d1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java @@ -20,24 +20,37 @@ import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata; import org.apache.hudi.avro.model.HoodieRollbackPlan; +import org.apache.hudi.avro.model.HoodieRollbackRequest; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieFileGroup; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.marker.WriteMarkersFactory; +import org.apache.hudi.testutils.Assertions; +import org.apache.hadoop.fs.FileSystem; +import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -45,10 +58,12 @@ import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackTestBase { @BeforeEach @@ -133,6 +148,65 @@ public void testCopyOnWriteRollbackActionExecutorForFileListingAsGenerateFile() assertFalse(testTable.baseFileExists(p2, "002", "id22")); } + @Test + public void testListBasedRollbackStrategy() throws Exception { + //just generate two partitions + dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}); + HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(false).build(); + // 1. prepare data + HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath); + SparkRDDWriteClient client = getHoodieWriteClient(cfg); + /** + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 3); + JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD statuses = client.upsert(writeRecords, newCommitTime); + Assertions.assertNoWriteErrors(statuses.collect()); + //client.commit(newCommitTime, statuses); + + /** + * Write 2 (updates) + */ + newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateUpdates(newCommitTime, records); + statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime); + Assertions.assertNoWriteErrors(statuses.collect()); + //client.commit(newCommitTime, statuses); + + context = new HoodieSparkEngineContext(jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = this.getHoodieTable(metaClient, cfg); + HoodieInstant needRollBackInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "002"); + String rollbackInstant = "003"; + + ListingBasedRollbackStrategy rollbackStrategy = new ListingBasedRollbackStrategy(table, context, table.getConfig(), rollbackInstant); + List rollBackRequests = rollbackStrategy.getRollbackRequests(needRollBackInstant); + rollBackRequests.forEach(entry -> System.out.println(" " + entry.getPartitionPath() + ", " + entry.getFileId() + " " + + Arrays.toString(entry.getFilesToBeDeleted().toArray()))); + + HoodieRollbackRequest rollbackRequest = rollBackRequests.stream().filter(entry -> entry.getPartitionPath().equals(DEFAULT_FIRST_PARTITION_PATH)).findFirst().get(); + + FileSystem fs = Mockito.mock(FileSystem.class); + MockitoAnnotations.initMocks(this); + + // mock to throw exception when fs.exists() is invoked + System.out.println("Fs.exists() call for " + rollbackRequest.getFilesToBeDeleted().get(0).toString()); + Mockito.when(fs.exists(any())) + .thenThrow(new IOException("Failing exists call for " + rollbackRequest.getFilesToBeDeleted().get(0))); + + rollbackStrategy = new ListingBasedRollbackStrategy(table, context, cfg, rollbackInstant); + List rollBackRequestsUpdated = rollbackStrategy.getRollbackRequests(needRollBackInstant); + rollBackRequestsUpdated.forEach(entry -> System.out.println(" " + entry.getPartitionPath() + ", " + entry.getFileId() + " " + + Arrays.toString(entry.getFilesToBeDeleted().toArray()))); + + assertEquals(rollBackRequests, rollBackRequestsUpdated); + } + + // Verify that rollback works with replacecommit @ParameterizedTest @ValueSource(booleans = {true, false}) From 3b0ae706dfab50acd0216b81fcbc4a331a94c530 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Mon, 1 Aug 2022 10:37:39 -0700 Subject: [PATCH 2/2] addressing comments --- .../rollback/TestCopyOnWriteRollbackActionExecutor.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java index 3837d5a6fe1d1..237f06917824c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java @@ -156,26 +156,19 @@ public void testListBasedRollbackStrategy() throws Exception { // 1. prepare data HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath); SparkRDDWriteClient client = getHoodieWriteClient(cfg); - /** - * Write 1 (only inserts) - */ + String newCommitTime = "001"; client.startCommitWithTime(newCommitTime); List records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 3); JavaRDD writeRecords = jsc.parallelize(records, 1); JavaRDD statuses = client.upsert(writeRecords, newCommitTime); Assertions.assertNoWriteErrors(statuses.collect()); - //client.commit(newCommitTime, statuses); - /** - * Write 2 (updates) - */ newCommitTime = "002"; client.startCommitWithTime(newCommitTime); records = dataGen.generateUpdates(newCommitTime, records); statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime); Assertions.assertNoWriteErrors(statuses.collect()); - //client.commit(newCommitTime, statuses); context = new HoodieSparkEngineContext(jsc); metaClient = HoodieTableMetaClient.reload(metaClient);