Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class RepairsCommand {
@ShellMethod(key = "repair deduplicate",
value = "De-duplicate a partition path contains duplicates & produce repaired files to replace with")
public String deduplicate(
@ShellOption(value = {"--duplicatedPartitionPath"}, help = "Partition Path containing the duplicates")
@ShellOption(value = {"--duplicatedPartitionPath"}, defaultValue = "", help = "Partition Path containing the duplicates")
final String duplicatedPartitionPath,
@ShellOption(value = {"--repairedOutputPath"}, help = "Location to place the repaired files")
final String repairedOutputPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class ITTestRepairsCommand extends HoodieCLIIntegrationTestBase {
private String duplicatedPartitionPath;
private String duplicatedPartitionPathWithUpdates;
private String duplicatedPartitionPathWithUpserts;
private String duplicatedNoPartitionPath;
private String repairedOutputPath;

private HoodieFileFormat fileFormat;
Expand All @@ -78,6 +79,7 @@ public void init() throws Exception {
duplicatedPartitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
duplicatedPartitionPathWithUpdates = HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
duplicatedPartitionPathWithUpserts = HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
duplicatedNoPartitionPath = HoodieTestDataGenerator.NO_PARTITION_PATH;
repairedOutputPath = Paths.get(basePath, "tmp").toString();

HoodieCLI.conf = jsc.hadoopConfiguration();
Expand Down Expand Up @@ -135,6 +137,23 @@ public void init() throws Exception {
.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "7", dupRecords)
.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "8", dupRecords);

// init cow table for non-partitioned table tests
String cowNonPartitionedTablePath = Paths.get(basePath, "cow_table_non_partitioned").toString();

// Create cow table and connect
new TableCommand().createTable(
cowNonPartitionedTablePath, "cow_table_non_partitioned", HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");

HoodieSparkWriteableTestTable cowNonPartitionedTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);

cowNonPartitionedTable.addCommit("20160401010101")
.withInserts(HoodieTestDataGenerator.NO_PARTITION_PATH, "1", hoodieRecords1)
.getFileIdWithLogFile(HoodieTestDataGenerator.NO_PARTITION_PATH);

cowNonPartitionedTable.addCommit("20160401010202")
.withInserts(HoodieTestDataGenerator.NO_PARTITION_PATH, "2", dupRecords);

fileFormat = metaClient.getTableConfig().getBaseFileFormat();
}

Expand Down Expand Up @@ -232,6 +251,39 @@ public void testDeduplicateWithUpserts(HoodieTableType tableType) throws IOExcep
assertEquals(100, result.count());
}

/**
* Test case dry run deduplicate for non-partitioned dataset.
*/
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
public void testDeduplicateNoPartitionWithInserts(HoodieTableType tableType) throws IOException {
String tablePath = Paths.get(basePath, "cow_table_non_partitioned").toString();
connectTableAndReloadMetaClient(tablePath);
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
fs.listStatus(new Path(Paths.get(tablePath, duplicatedNoPartitionPath).toString())));
List<String> filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
assertEquals(2, filteredStatuses.size(), "There should be 2 files.");

// Before deduplicate, all files contain 110 records
String[] files = filteredStatuses.toArray(new String[0]);
Dataset df = readFiles(files);
assertEquals(110, df.count());

// use default value without specifying duplicatedPartitionPath
String cmdStr = String.format("repair deduplicate --repairedOutputPath %s --sparkMaster %s",
repairedOutputPath, "local");
Object resultForCmd = shell.evaluate(() -> cmdStr);
assertTrue(ShellEvaluationResultUtil.isSuccess(resultForCmd));
assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX + repairedOutputPath, resultForCmd.toString());

// After deduplicate, there are 100 records
FileStatus[] fileStatus = fs.listStatus(new Path(repairedOutputPath));
files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new);
Dataset result = readFiles(files);
assertEquals(100, result.count());
}

/**
* Test case for real run deduplicate.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
// with default bloom filter with 60,000 entries and 0.000000001 FPRate
public static final int BLOOM_FILTER_BYTES = 323495;
private static Logger logger = LogManager.getLogger(HoodieTestDataGenerator.class);
public static final String NO_PARTITION_PATH = "";
public static final String DEFAULT_FIRST_PARTITION_PATH = "2016/03/15";
public static final String DEFAULT_SECOND_PARTITION_PATH = "2015/03/16";
public static final String DEFAULT_THIRD_PARTITION_PATH = "2015/03/17";
Expand Down