Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class DedupeSparkJob(basePath: String,
val metadata = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build()

val allFiles = fs.listStatus(new org.apache.hadoop.fs.Path(s"$basePath/$duplicatedPartitionPath"))
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles)
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitsTimeline.filterCompletedInstants(), allFiles)
val latestFiles: java.util.List[HoodieBaseFile] = fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]())
val filteredStatuses = latestFiles.map(f => f.getPath)
LOG.info(s" List of files under partition: ${} => ${filteredStatuses.mkString(" ")}")
Expand Down Expand Up @@ -187,7 +187,7 @@ class DedupeSparkJob(basePath: String,
val metadata = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build()

val allFiles = fs.listStatus(new Path(s"$basePath/$duplicatedPartitionPath"))
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles)
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitsTimeline.filterCompletedInstants(), allFiles)

val latestFiles: java.util.List[HoodieBaseFile] = fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.springframework.shell.core.CommandResult;

import java.io.IOException;
Expand All @@ -63,58 +64,86 @@ public class ITTestRepairsCommand extends HoodieCLIIntegrationTestBase {
private String duplicatedPartitionPathWithUpdates;
private String duplicatedPartitionPathWithUpserts;
private String repairedOutputPath;

private HoodieFileFormat fileFormat;

@BeforeEach
public void init() throws Exception {
final String tablePath = Paths.get(basePath, "test_table").toString();
duplicatedPartitionPath = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).toString();
duplicatedPartitionPathWithUpdates = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).toString();
duplicatedPartitionPathWithUpserts = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH).toString();
duplicatedPartitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
duplicatedPartitionPathWithUpdates = HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
duplicatedPartitionPathWithUpserts = HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
repairedOutputPath = Paths.get(basePath, "tmp").toString();

HoodieCLI.conf = jsc.hadoopConfiguration();
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());

// Create table and connect
// generate 200 records
HoodieRecord[] hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema).toArray(new HoodieRecord[100]);
HoodieRecord[] hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema).toArray(new HoodieRecord[100]);

// generate duplicates
HoodieRecord[] dupRecords = Arrays.copyOf(hoodieRecords1, 10);

// init cow table
String cowTablePath = Paths.get(basePath, HoodieTableType.COPY_ON_WRITE.name()).toString();

// Create cow table and connect
new TableCommand().createTable(
tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
cowTablePath, "cow_table", HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");

// generate 200 records
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);
HoodieSparkWriteableTestTable cowTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);

HoodieRecord[] hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema).toArray(new HoodieRecord[100]);
HoodieRecord[] hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema).toArray(new HoodieRecord[100]);
testTable.addCommit("20160401010101")
cowTable.addCommit("20160401010101")
.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "1", hoodieRecords1)
.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "2", hoodieRecords2)
.getFileIdWithLogFile(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);

testTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "4", hoodieRecords1)
cowTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "4", hoodieRecords1)
.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "6", hoodieRecords1);

// read records and get 10 to generate duplicates
HoodieRecord[] dupRecords = Arrays.copyOf(hoodieRecords1, 10);
testTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "5", dupRecords);
testTable.addCommit("20160401010202")
cowTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "5", dupRecords);
cowTable.addCommit("20160401010202")
.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "3", dupRecords);
testTable.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "7", dupRecords)
cowTable.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "7", dupRecords)
.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "8", dupRecords);

metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
// init mor table
String morTablePath = Paths.get(basePath, HoodieTableType.MERGE_ON_READ.name()).toString();
// Create mor table and connect
new TableCommand().createTable(
morTablePath, "mor_table", HoodieTableType.MERGE_ON_READ.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
HoodieSparkWriteableTestTable morTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);

morTable.addDeltaCommit("20160401010101");
morTable.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "1", hoodieRecords1)
.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "2", hoodieRecords2)
.getFileIdWithLogFile(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);

morTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "4", hoodieRecords1)
.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "6", hoodieRecords1)
.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "5", dupRecords);
morTable.addDeltaCommit("20160401010202");
morTable.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "3", dupRecords)
.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "7", dupRecords)
.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "8", dupRecords);

fileFormat = metaClient.getTableConfig().getBaseFileFormat();
}

/**
* Test case for dry run deduplicate.
*/
@Test
public void testDeduplicateWithInserts() throws IOException {
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
public void testDeduplicateWithInserts(HoodieTableType tableType) throws IOException {
String tablePath = Paths.get(basePath, tableType.name()).toString();
connectTableAndReloadMetaClient(tablePath);
// get fs and check number of latest files
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
fs.listStatus(new Path(duplicatedPartitionPath)));
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
fs.listStatus(new Path(Paths.get(tablePath, duplicatedPartitionPath).toString())));
List<String> filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
assertEquals(3, filteredStatuses.size(), "There should be 3 files.");

Expand All @@ -137,11 +166,14 @@ public void testDeduplicateWithInserts() throws IOException {
assertEquals(200, result.count());
}

@Test
public void testDeduplicateWithUpdates() throws IOException {
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
public void testDeduplicateWithUpdates(HoodieTableType tableType) throws IOException {
String tablePath = Paths.get(basePath, tableType.name()).toString();
connectTableAndReloadMetaClient(tablePath);
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
fs.listStatus(new Path(duplicatedPartitionPathWithUpdates)));
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
fs.listStatus(new Path(Paths.get(tablePath, duplicatedPartitionPathWithUpdates).toString())));
List<String> filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
assertEquals(2, filteredStatuses.size(), "There should be 2 files.");

Expand All @@ -164,11 +196,14 @@ public void testDeduplicateWithUpdates() throws IOException {
assertEquals(100, result.count());
}

@Test
public void testDeduplicateWithUpserts() throws IOException {
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
public void testDeduplicateWithUpserts(HoodieTableType tableType) throws IOException {
String tablePath = Paths.get(basePath, tableType.name()).toString();
connectTableAndReloadMetaClient(tablePath);
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
fs.listStatus(new Path(duplicatedPartitionPathWithUpserts)));
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
fs.listStatus(new Path(Paths.get(tablePath, duplicatedPartitionPathWithUpserts).toString())));
List<String> filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
assertEquals(3, filteredStatuses.size(), "There should be 3 files.");

Expand All @@ -194,12 +229,15 @@ public void testDeduplicateWithUpserts() throws IOException {
/**
* Test case for real run deduplicate.
*/
@Test
public void testDeduplicateWithReal() throws IOException {
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
public void testDeduplicateWithReal(HoodieTableType tableType) throws IOException {
String tablePath = Paths.get(basePath, tableType.name()).toString();
connectTableAndReloadMetaClient(tablePath);
// get fs and check number of latest files
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
fs.listStatus(new Path(duplicatedPartitionPath)));
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
fs.listStatus(new Path(Paths.get(tablePath, duplicatedPartitionPath).toString())));
List<String> filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
assertEquals(3, filteredStatuses.size(), "There should be 3 files.");

Expand All @@ -216,12 +254,17 @@ public void testDeduplicateWithReal() throws IOException {
assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX + partitionPath, cr.getResult().toString());

// After deduplicate, there are 200 records under partition path
FileStatus[] fileStatus = fs.listStatus(new Path(duplicatedPartitionPath));
FileStatus[] fileStatus = fs.listStatus(new Path(Paths.get(tablePath, duplicatedPartitionPath).toString()));
files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new);
Dataset result = readFiles(files);
assertEquals(200, result.count());
}

private void connectTableAndReloadMetaClient(String tablePath) throws IOException {
new TableCommand().connect(tablePath, TimelineLayoutVersion.VERSION_1, false, 0, 0, 0);
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
}

private Dataset readFiles(String[] files) {
if (HoodieFileFormat.PARQUET.equals(fileFormat)) {
return sqlContext.read().parquet(files);
Expand Down