diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala index e471ed9258658..8fea91a075672 100644 --- a/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala +++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala @@ -78,7 +78,7 @@ class DedupeSparkJob(basePath: String, val metadata = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build() val allFiles = fs.listStatus(new org.apache.hadoop.fs.Path(s"$basePath/$duplicatedPartitionPath")) - val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles) + val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitsTimeline.filterCompletedInstants(), allFiles) val latestFiles: java.util.List[HoodieBaseFile] = fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]()) val filteredStatuses = latestFiles.map(f => f.getPath) LOG.info(s" List of files under partition: ${} => ${filteredStatuses.mkString(" ")}") @@ -187,7 +187,7 @@ class DedupeSparkJob(basePath: String, val metadata = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build() val allFiles = fs.listStatus(new Path(s"$basePath/$duplicatedPartitionPath")) - val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles) + val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitsTimeline.filterCompletedInstants(), allFiles) val latestFiles: java.util.List[HoodieBaseFile] = fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]()) diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java index d7d6872bc1b4b..52b8aed8de00d 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java @@ -39,7 +39,8 @@ import org.apache.hadoop.fs.Path; import org.apache.spark.sql.Dataset; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.springframework.shell.core.CommandResult; import java.io.IOException; @@ -63,58 +64,86 @@ public class ITTestRepairsCommand extends HoodieCLIIntegrationTestBase { private String duplicatedPartitionPathWithUpdates; private String duplicatedPartitionPathWithUpserts; private String repairedOutputPath; + private HoodieFileFormat fileFormat; @BeforeEach public void init() throws Exception { - final String tablePath = Paths.get(basePath, "test_table").toString(); - duplicatedPartitionPath = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).toString(); - duplicatedPartitionPathWithUpdates = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).toString(); - duplicatedPartitionPathWithUpserts = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH).toString(); + duplicatedPartitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; + duplicatedPartitionPathWithUpdates = HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; + duplicatedPartitionPathWithUpserts = HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH; repairedOutputPath = Paths.get(basePath, "tmp").toString(); HoodieCLI.conf = jsc.hadoopConfiguration(); + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); - // Create table and connect + // generate 200 records + HoodieRecord[] hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema).toArray(new HoodieRecord[100]); + HoodieRecord[] hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema).toArray(new HoodieRecord[100]); + + // generate duplicates + HoodieRecord[] dupRecords = Arrays.copyOf(hoodieRecords1, 10); + + // init cow table + String cowTablePath = Paths.get(basePath, HoodieTableType.COPY_ON_WRITE.name()).toString(); + + // Create cow table and connect new TableCommand().createTable( - tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(), + cowTablePath, "cow_table", HoodieTableType.COPY_ON_WRITE.name(), "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); - // generate 200 records - Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); - HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema); + HoodieSparkWriteableTestTable cowTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema); - HoodieRecord[] hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema).toArray(new HoodieRecord[100]); - HoodieRecord[] hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema).toArray(new HoodieRecord[100]); - testTable.addCommit("20160401010101") + cowTable.addCommit("20160401010101") .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "1", hoodieRecords1) .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "2", hoodieRecords2) .getFileIdWithLogFile(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); - testTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "4", hoodieRecords1) + cowTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "4", hoodieRecords1) .withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "6", hoodieRecords1); - // read records and get 10 to generate duplicates - HoodieRecord[] dupRecords = Arrays.copyOf(hoodieRecords1, 10); - testTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "5", dupRecords); - testTable.addCommit("20160401010202") + cowTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "5", dupRecords); + cowTable.addCommit("20160401010202") .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "3", dupRecords); - testTable.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "7", dupRecords) + cowTable.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "7", dupRecords) .withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "8", dupRecords); - metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + // init mor table + String morTablePath = Paths.get(basePath, HoodieTableType.MERGE_ON_READ.name()).toString(); + // Create mor table and connect + new TableCommand().createTable( + morTablePath, "mor_table", HoodieTableType.MERGE_ON_READ.name(), + "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); + HoodieSparkWriteableTestTable morTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema); + + morTable.addDeltaCommit("20160401010101"); + morTable.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "1", hoodieRecords1) + .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "2", hoodieRecords2) + .getFileIdWithLogFile(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); + + morTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "4", hoodieRecords1) + .withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "6", hoodieRecords1) + .withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "5", dupRecords); + morTable.addDeltaCommit("20160401010202"); + morTable.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "3", dupRecords) + .withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "7", dupRecords) + .withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "8", dupRecords); + fileFormat = metaClient.getTableConfig().getBaseFileFormat(); } /** * Test case for dry run deduplicate. */ - @Test - public void testDeduplicateWithInserts() throws IOException { + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + public void testDeduplicateWithInserts(HoodieTableType tableType) throws IOException { + String tablePath = Paths.get(basePath, tableType.name()).toString(); + connectTableAndReloadMetaClient(tablePath); // get fs and check number of latest files HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), - fs.listStatus(new Path(duplicatedPartitionPath))); + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), + fs.listStatus(new Path(Paths.get(tablePath, duplicatedPartitionPath).toString()))); List filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); assertEquals(3, filteredStatuses.size(), "There should be 3 files."); @@ -137,11 +166,14 @@ public void testDeduplicateWithInserts() throws IOException { assertEquals(200, result.count()); } - @Test - public void testDeduplicateWithUpdates() throws IOException { + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + public void testDeduplicateWithUpdates(HoodieTableType tableType) throws IOException { + String tablePath = Paths.get(basePath, tableType.name()).toString(); + connectTableAndReloadMetaClient(tablePath); HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), - fs.listStatus(new Path(duplicatedPartitionPathWithUpdates))); + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), + fs.listStatus(new Path(Paths.get(tablePath, duplicatedPartitionPathWithUpdates).toString()))); List filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); assertEquals(2, filteredStatuses.size(), "There should be 2 files."); @@ -164,11 +196,14 @@ public void testDeduplicateWithUpdates() throws IOException { assertEquals(100, result.count()); } - @Test - public void testDeduplicateWithUpserts() throws IOException { + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + public void testDeduplicateWithUpserts(HoodieTableType tableType) throws IOException { + String tablePath = Paths.get(basePath, tableType.name()).toString(); + connectTableAndReloadMetaClient(tablePath); HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), - fs.listStatus(new Path(duplicatedPartitionPathWithUpserts))); + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), + fs.listStatus(new Path(Paths.get(tablePath, duplicatedPartitionPathWithUpserts).toString()))); List filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); assertEquals(3, filteredStatuses.size(), "There should be 3 files."); @@ -194,12 +229,15 @@ public void testDeduplicateWithUpserts() throws IOException { /** * Test case for real run deduplicate. */ - @Test - public void testDeduplicateWithReal() throws IOException { + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + public void testDeduplicateWithReal(HoodieTableType tableType) throws IOException { + String tablePath = Paths.get(basePath, tableType.name()).toString(); + connectTableAndReloadMetaClient(tablePath); // get fs and check number of latest files HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), - fs.listStatus(new Path(duplicatedPartitionPath))); + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), + fs.listStatus(new Path(Paths.get(tablePath, duplicatedPartitionPath).toString()))); List filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); assertEquals(3, filteredStatuses.size(), "There should be 3 files."); @@ -216,12 +254,17 @@ public void testDeduplicateWithReal() throws IOException { assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX + partitionPath, cr.getResult().toString()); // After deduplicate, there are 200 records under partition path - FileStatus[] fileStatus = fs.listStatus(new Path(duplicatedPartitionPath)); + FileStatus[] fileStatus = fs.listStatus(new Path(Paths.get(tablePath, duplicatedPartitionPath).toString())); files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new); Dataset result = readFiles(files); assertEquals(200, result.count()); } + private void connectTableAndReloadMetaClient(String tablePath) throws IOException { + new TableCommand().connect(tablePath, TimelineLayoutVersion.VERSION_1, false, 0, 0, 0); + metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + } + private Dataset readFiles(String[] files) { if (HoodieFileFormat.PARQUET.equals(fileFormat)) { return sqlContext.read().parquet(files);