Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public abstract class HoodieClientTestHarness implements Serializable {
protected TemporaryFolder folder = null;
protected transient HoodieTestDataGenerator dataGen = null;
protected transient ExecutorService executorService;
protected transient HoodieTableMetaClient metaClient;

//dfs
protected String dfsBasePath;
Expand All @@ -72,15 +73,15 @@ public void initResources() throws IOException {
initSparkContexts();
initTestDataGenerator();
initFileSystem();
initTableType();
initMetaClient();
}

/**
* Cleanups resource group for the subclasses of {@link TestHoodieClientBase}.
* @throws IOException
*/
public void cleanupResources() throws IOException {
cleanupTableType();
cleanupMetaClient();
cleanupSparkContexts();
cleanupTestDataGenerator();
cleanupFileSystem();
Expand Down Expand Up @@ -191,7 +192,7 @@ protected void cleanupFileSystem() throws IOException {
*
* @throws IOException
*/
protected void initTableType() throws IOException {
protected void initMetaClient() throws IOException {
if (basePath == null) {
throw new IllegalStateException("The base path has not been initialized.");
}
Expand All @@ -200,14 +201,14 @@ protected void initTableType() throws IOException {
throw new IllegalStateException("The Spark context has not been initialized.");
}

HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
}

/**
* Cleanups table type.
*/
protected void cleanupTableType() {

protected void cleanupMetaClient() {
metaClient = null;
}

/**
Expand Down
76 changes: 34 additions & 42 deletions hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private String insertFirstBigBatchForClientCleanerTest(
assertNoWriteErrors(statuses);

// verify that there is a commit
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
// Should have 100 records in table (check using Index), all in locations marked at commit
Expand Down Expand Up @@ -200,8 +200,8 @@ private void testInsertAndCleanByVersions(
insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn);

Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>();
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
for (String partitionPath : dataGen.getPartitionPaths()) {
TableFileSystemView fsView = table.getFileSystemView();
Option<Boolean> added = Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst()
Expand Down Expand Up @@ -239,8 +239,8 @@ private void testInsertAndCleanByVersions(
// Verify there are no errors
assertNoWriteErrors(statuses);

metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc);
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline();

TableFileSystemView fsView = table.getFileSystemView();
Expand Down Expand Up @@ -375,8 +375,8 @@ private void testInsertAndCleanByCommits(
// Verify there are no errors
assertNoWriteErrors(statuses);

HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable table1 = HoodieTable.getHoodieTable(metadata, cfg, jsc);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table1 = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits - 1);
Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet());
Expand Down Expand Up @@ -424,9 +424,8 @@ public void testKeepLatestFileVersions() throws IOException {
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000");
String file1P1C0 = HoodieTestUtils
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000");
HoodieTable table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
jsc);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);

List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc);
assertEquals("Must not clean any files", 0,
Expand All @@ -442,8 +441,8 @@ public void testKeepLatestFileVersions() throws IOException {

// make next commit, with 1 insert & 1 update per partition
HoodieTestUtils.createCommitFiles(basePath, "001");
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true), config,
jsc);
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);

String file2P0C1 = HoodieTestUtils
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
Expand Down Expand Up @@ -472,8 +471,8 @@ public void testKeepLatestFileVersions() throws IOException {

// make next commit, with 2 updates to existing files, and 1 insert
HoodieTestUtils.createCommitFiles(basePath, "002");
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
config, jsc);
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);

HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update
Expand Down Expand Up @@ -578,9 +577,8 @@ public void testKeepLatestCommits() throws IOException {
String file1P1C0 = HoodieTestUtils
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000");

HoodieTable table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
jsc);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);

List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc);
assertEquals("Must not clean any files", 0,
Expand All @@ -596,8 +594,8 @@ public void testKeepLatestCommits() throws IOException {

// make next commit, with 1 insert & 1 update per partition
HoodieTestUtils.createCommitFiles(basePath, "001");
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
config, jsc);
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);

String file2P0C1 = HoodieTestUtils
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
Expand Down Expand Up @@ -626,8 +624,8 @@ public void testKeepLatestCommits() throws IOException {

// make next commit, with 2 updates to existing files, and 1 insert
HoodieTestUtils.createCommitFiles(basePath, "002");
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
config, jsc);
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);

HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update
Expand All @@ -646,8 +644,8 @@ public void testKeepLatestCommits() throws IOException {

// make next commit, with 2 updates to existing files, and 1 insert
HoodieTestUtils.createCommitFiles(basePath, "003");
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
config, jsc);
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);

HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file1P0C0); // update
Expand Down Expand Up @@ -700,9 +698,8 @@ public void testCleanMarkerDataFilesOnRollback() throws IOException {
assertEquals("Some marker files are created.", markerFiles.size(), getTotalTempFiles());

HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
HoodieTable table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
jsc);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);

table.rollback(jsc, "000", true);
assertEquals("All temp files are deleted.", 0, getTotalTempFiles());
Expand All @@ -722,9 +719,8 @@ public void testCleaningWithZeroPartitonPaths() throws IOException {
// with just some commit metadata, but no data/partitionPaths.
HoodieTestUtils.createCommitFiles(basePath, "000");

HoodieTable table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
jsc);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);

List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc);
assertTrue("HoodieCleanStats should be empty for a table with empty partitionPaths", hoodieCleanStatsOne.isEmpty());
Expand Down Expand Up @@ -783,9 +779,8 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
updateAllFilesInPartition(filesP1C0, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "003");
updateAllFilesInPartition(filesP2C0, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "003");

HoodieTable table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
jsc);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc);

assertEquals(100,
Expand Down Expand Up @@ -890,9 +885,8 @@ public void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDele
for (int j = 1; j <= i; j++) {
if (j == i && j <= maxNumFileIdsForCompaction) {
expFileIdToPendingCompaction.put(fileId, compactionInstants[j]);
HoodieTable table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
jsc);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
FileSlice slice = table.getRTFileSystemView().getLatestFileSlices(
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
.filter(fs -> fs.getFileId().equals(fileId)).findFirst().get();
Expand Down Expand Up @@ -934,15 +928,13 @@ public void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDele
}

// Clean now
HoodieTable table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
jsc);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc);

// Test for safety
final HoodieTable hoodieTable = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
jsc);
final HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.reload(metaClient);
final HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);

expFileIdToPendingCompaction.entrySet().stream().forEach(entry -> {
String fileId = entry.getKey();
Expand All @@ -961,7 +953,7 @@ public void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDele
// Test for progress (Did we clean some files ?)
long numFilesUnderCompactionDeleted =
hoodieCleanStats.stream().flatMap(cleanStat -> {
return convertPathToFileIdWithCommitTime(metaClient, cleanStat.getDeletePathPatterns()).map(
return convertPathToFileIdWithCommitTime(newMetaClient, cleanStat.getDeletePathPatterns()).map(
fileIdWithCommitTime -> {
if (expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) {
Assert.assertTrue("Deleted instant time must be less than pending compaction",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void testSavepointAndRollback() throws Exception {
assertNoWriteErrors(statuses);
List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(),
getConfig().shouldAssumeDatePartitioning());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
final ReadOptimizedView view1 = table.getROFileSystemView();

Expand All @@ -122,7 +122,7 @@ public void testSavepointAndRollback() throws Exception {
// Verify there are no errors
assertNoWriteErrors(statuses);

metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
final ReadOptimizedView view2 = table.getROFileSystemView();

Expand All @@ -143,7 +143,7 @@ public void testSavepointAndRollback() throws Exception {
HoodieInstant savepoint = table.getCompletedSavepointTimeline().getInstants().findFirst().get();
client.rollbackToSavepoint(savepoint.getTimestamp());

metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
final ReadOptimizedView view3 = table.getROFileSystemView();
dataFiles = partitionPaths.stream().flatMap(s -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ public void setUp() throws Exception {
initTempFolderAndPath();
basePath = folder.getRoot().getAbsolutePath() + QPS_TEST_SUFFIX_PATH;
// Initialize table
initTableType();
initMetaClient();
}

@After
public void tearDown() throws Exception {
cleanupSparkContexts();
cleanupTempFolderAndPath();
cleanupTableType();
cleanupMetaClient();
if (utility != null) {
utility.shutdownMiniCluster();
}
Expand Down
Loading