diff --git a/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java index 84a9e21540f17..10fb0bcf21a10 100644 --- a/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java +++ b/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java @@ -55,6 +55,7 @@ public abstract class HoodieClientTestHarness implements Serializable { protected TemporaryFolder folder = null; protected transient HoodieTestDataGenerator dataGen = null; protected transient ExecutorService executorService; + protected transient HoodieTableMetaClient metaClient; //dfs protected String dfsBasePath; @@ -72,7 +73,7 @@ public void initResources() throws IOException { initSparkContexts(); initTestDataGenerator(); initFileSystem(); - initTableType(); + initMetaClient(); } /** @@ -80,7 +81,7 @@ public void initResources() throws IOException { * @throws IOException */ public void cleanupResources() throws IOException { - cleanupTableType(); + cleanupMetaClient(); cleanupSparkContexts(); cleanupTestDataGenerator(); cleanupFileSystem(); @@ -191,7 +192,7 @@ protected void cleanupFileSystem() throws IOException { * * @throws IOException */ - protected void initTableType() throws IOException { + protected void initMetaClient() throws IOException { if (basePath == null) { throw new IllegalStateException("The base path has not been initialized."); } @@ -200,14 +201,14 @@ protected void initTableType() throws IOException { throw new IllegalStateException("The Spark context has not been initialized."); } - HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType()); + metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType()); } /** * Cleanups table type. */ - protected void cleanupTableType() { - + protected void cleanupMetaClient() { + metaClient = null; } /** diff --git a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java index 1e8df4aef60ce..fe2549193a7e3 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java @@ -114,7 +114,7 @@ private String insertFirstBigBatchForClientCleanerTest( assertNoWriteErrors(statuses); // verify that there is a commit - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants()); // Should have 100 records in table (check using Index), all in locations marked at commit @@ -200,8 +200,8 @@ private void testInsertAndCleanByVersions( insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn); Map compactionFileIdToLatestFileSlice = new HashMap<>(); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieTable table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc); for (String partitionPath : dataGen.getPartitionPaths()) { TableFileSystemView fsView = table.getFileSystemView(); Option added = Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst() @@ -239,8 +239,8 @@ private void testInsertAndCleanByVersions( // Verify there are no errors assertNoWriteErrors(statuses); - metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc); HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline(); TableFileSystemView fsView = table.getFileSystemView(); @@ -375,8 +375,8 @@ private void testInsertAndCleanByCommits( // Verify there are no errors assertNoWriteErrors(statuses); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieTable table1 = HoodieTable.getHoodieTable(metadata, cfg, jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table1 = HoodieTable.getHoodieTable(metaClient, cfg, jsc); HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline(); Option earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits - 1); Set acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet()); @@ -424,9 +424,8 @@ public void testKeepLatestFileVersions() throws IOException { .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000"); String file1P1C0 = HoodieTestUtils .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000"); - HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, - jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); List hoodieCleanStatsOne = table.clean(jsc); assertEquals("Must not clean any files", 0, @@ -442,8 +441,8 @@ public void testKeepLatestFileVersions() throws IOException { // make next commit, with 1 insert & 1 update per partition HoodieTestUtils.createCommitFiles(basePath, "001"); - table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true), config, - jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + table = HoodieTable.getHoodieTable(metaClient, config, jsc); String file2P0C1 = HoodieTestUtils .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert @@ -472,8 +471,8 @@ public void testKeepLatestFileVersions() throws IOException { // make next commit, with 2 updates to existing files, and 1 insert HoodieTestUtils.createCommitFiles(basePath, "002"); - table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), - config, jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieTestUtils .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update @@ -578,9 +577,8 @@ public void testKeepLatestCommits() throws IOException { String file1P1C0 = HoodieTestUtils .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000"); - HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, - jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); List hoodieCleanStatsOne = table.clean(jsc); assertEquals("Must not clean any files", 0, @@ -596,8 +594,8 @@ public void testKeepLatestCommits() throws IOException { // make next commit, with 1 insert & 1 update per partition HoodieTestUtils.createCommitFiles(basePath, "001"); - table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), - config, jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + table = HoodieTable.getHoodieTable(metaClient, config, jsc); String file2P0C1 = HoodieTestUtils .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert @@ -626,8 +624,8 @@ public void testKeepLatestCommits() throws IOException { // make next commit, with 2 updates to existing files, and 1 insert HoodieTestUtils.createCommitFiles(basePath, "002"); - table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), - config, jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieTestUtils .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update @@ -646,8 +644,8 @@ public void testKeepLatestCommits() throws IOException { // make next commit, with 2 updates to existing files, and 1 insert HoodieTestUtils.createCommitFiles(basePath, "003"); - table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), - config, jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieTestUtils .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file1P0C0); // update @@ -700,9 +698,8 @@ public void testCleanMarkerDataFilesOnRollback() throws IOException { assertEquals("Some marker files are created.", markerFiles.size(), getTotalTempFiles()); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); - HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, - jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); table.rollback(jsc, "000", true); assertEquals("All temp files are deleted.", 0, getTotalTempFiles()); @@ -722,9 +719,8 @@ public void testCleaningWithZeroPartitonPaths() throws IOException { // with just some commit metadata, but no data/partitionPaths. HoodieTestUtils.createCommitFiles(basePath, "000"); - HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, - jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); List hoodieCleanStatsOne = table.clean(jsc); assertTrue("HoodieCleanStats should be empty for a table with empty partitionPaths", hoodieCleanStatsOne.isEmpty()); @@ -783,9 +779,8 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) { updateAllFilesInPartition(filesP1C0, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "003"); updateAllFilesInPartition(filesP2C0, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "003"); - HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, - jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); List hoodieCleanStats = table.clean(jsc); assertEquals(100, @@ -890,9 +885,8 @@ public void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDele for (int j = 1; j <= i; j++) { if (j == i && j <= maxNumFileIdsForCompaction) { expFileIdToPendingCompaction.put(fileId, compactionInstants[j]); - HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, - jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); FileSlice slice = table.getRTFileSystemView().getLatestFileSlices( HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) .filter(fs -> fs.getFileId().equals(fileId)).findFirst().get(); @@ -934,15 +928,13 @@ public void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDele } // Clean now - HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, - jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); List hoodieCleanStats = table.clean(jsc); // Test for safety - final HoodieTable hoodieTable = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, - jsc); + final HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.reload(metaClient); + final HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); expFileIdToPendingCompaction.entrySet().stream().forEach(entry -> { String fileId = entry.getKey(); @@ -961,7 +953,7 @@ public void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDele // Test for progress (Did we clean some files ?) long numFilesUnderCompactionDeleted = hoodieCleanStats.stream().flatMap(cleanStat -> { - return convertPathToFileIdWithCommitTime(metaClient, cleanStat.getDeletePathPatterns()).map( + return convertPathToFileIdWithCommitTime(newMetaClient, cleanStat.getDeletePathPatterns()).map( fileIdWithCommitTime -> { if (expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) { Assert.assertTrue("Deleted instant time must be less than pending compaction", diff --git a/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java b/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java index 20cc86c93354f..11504a47968ea 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java @@ -97,7 +97,7 @@ public void testSavepointAndRollback() throws Exception { assertNoWriteErrors(statuses); List partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning()); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc); final ReadOptimizedView view1 = table.getROFileSystemView(); @@ -122,7 +122,7 @@ public void testSavepointAndRollback() throws Exception { // Verify there are no errors assertNoWriteErrors(statuses); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc); final ReadOptimizedView view2 = table.getROFileSystemView(); @@ -143,7 +143,7 @@ public void testSavepointAndRollback() throws Exception { HoodieInstant savepoint = table.getCompletedSavepointTimeline().getInstants().findFirst().get(); client.rollbackToSavepoint(savepoint.getTimestamp()); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc); final ReadOptimizedView view3 = table.getROFileSystemView(); dataFiles = partitionPaths.stream().flatMap(s -> { diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java index 622395d3f8953..9efe708518b1b 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java @@ -53,14 +53,14 @@ public void setUp() throws Exception { initTempFolderAndPath(); basePath = folder.getRoot().getAbsolutePath() + QPS_TEST_SUFFIX_PATH; // Initialize table - initTableType(); + initMetaClient(); } @After public void tearDown() throws Exception { cleanupSparkContexts(); cleanupTempFolderAndPath(); - cleanupTableType(); + cleanupMetaClient(); if (utility != null) { utility.shutdownMiniCluster(); } diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java index 9d89186b9a2dc..6c2fc0f5b1665 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java @@ -40,7 +40,6 @@ import org.apache.hudi.WriteStatus; import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.config.HoodieCompactionConfig; @@ -104,9 +103,8 @@ public void setUp() throws Exception { // Create a temp folder as the base path initTempFolderAndPath(); - // Initialize table - HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath); initTestDataGenerator(); + initMetaClient(); } @After @@ -114,6 +112,7 @@ public void tearDown() throws Exception { cleanupSparkContexts(); cleanupTempFolderAndPath(); cleanupTestDataGenerator(); + cleanupMetaClient(); } private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception { @@ -132,7 +131,7 @@ public void testSimpleTagLocationAndUpdate() throws Exception { HBaseIndex index = new HBaseIndex(config); try (HoodieWriteClient writeClient = getWriteClient(config);) { writeClient.startCommit(); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); // Test tagLocation without any entries in index @@ -151,7 +150,7 @@ public void testSimpleTagLocationAndUpdate() throws Exception { // Now commit this & update location of records inserted and validate no errors writeClient.commit(newCommitTime, writeStatues); // Now tagLocation for these records, hbaseIndex should tag them correctly - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); assertTrue(javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 200); @@ -173,7 +172,7 @@ public void testTagLocationAndDuplicateUpdate() throws Exception { HBaseIndex index = new HBaseIndex(config); HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); writeClient.startCommit(); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); @@ -185,7 +184,7 @@ public void testTagLocationAndDuplicateUpdate() throws Exception { // Now commit this & update location of records inserted and validate no errors writeClient.commit(newCommitTime, writeStatues); // Now tagLocation for these records, hbaseIndex should tag them correctly - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); assertTrue(javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 10); @@ -205,7 +204,7 @@ public void testSimpleTagLocationAndUpdateWithRollback() throws Exception { String newCommitTime = writeClient.startCommit(); List records = dataGen.generateInserts(newCommitTime, 200); JavaRDD writeRecords = jsc.parallelize(records, 1); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); // Insert 200 records @@ -257,7 +256,7 @@ public void testTotalGetsBatching() throws Exception { String newCommitTime = writeClient.startCommit(); List records = dataGen.generateInserts(newCommitTime, 250); JavaRDD writeRecords = jsc.parallelize(records, 1); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); // Insert 250 records @@ -282,7 +281,7 @@ public void testTotalPutsBatching() throws Exception { String newCommitTime = writeClient.startCommit(); List records = dataGen.generateInserts(newCommitTime, 250); JavaRDD writeRecords = jsc.parallelize(records, 1); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); // Insert 200 records diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java index d073757c570ff..4354216c3e192 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java @@ -36,14 +36,14 @@ public class TestHoodieIndex extends HoodieClientTestHarness { public void setUp() throws Exception { initSparkContexts("TestHoodieIndex"); initTempFolderAndPath(); - initTableType(); + initMetaClient(); } @After public void tearDown() throws Exception { cleanupSparkContexts(); cleanupTempFolderAndPath(); - cleanupTableType(); + cleanupMetaClient(); } @Test diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index 28d19eb1cf1d8..69d0cfaf64b6c 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -44,7 +44,6 @@ import org.apache.hudi.common.TestRawTripPayload; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.FileIOUtils; @@ -92,10 +91,10 @@ public void setUp() throws Exception { initSparkContexts("TestHoodieBloomIndex"); initTempFolderAndPath(); initFileSystem(); - HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath); // We have some records to be tagged (two different partitions) schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt")); schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); + initMetaClient(); } @After @@ -103,6 +102,7 @@ public void tearDown() throws Exception { cleanupSparkContexts(); cleanupFileSystem(); cleanupTempFolderAndPath(); + cleanupMetaClient(); } private HoodieWriteConfig makeConfig() { @@ -163,8 +163,8 @@ public void testLoadInvolvedFiles() throws IOException { false); List partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12"); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); List> filesList = index.loadInvolvedFiles(partitions, jsc, table); // Still 0, as no valid commit assertEquals(filesList.size(), 0); @@ -174,7 +174,7 @@ public void testLoadInvolvedFiles() throws IOException { new File(basePath + "/.hoodie/20160401010101.commit").createNewFile(); new File(basePath + "/.hoodie/20150312101010.commit").createNewFile(); - table = HoodieTable.getHoodieTable(metadata, config, jsc); + table = HoodieTable.getHoodieTable(metaClient, config, jsc); filesList = index.loadInvolvedFiles(partitions, jsc, table); assertEquals(filesList.size(), 4); @@ -286,9 +286,9 @@ public void testTagLocationWithEmptyRDD() throws Exception { // We have some records to be tagged (two different partitions) JavaRDD recordRDD = jsc.emptyRDD(); // Also create the metadata and config - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieWriteConfig config = makeConfig(); - HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); // Let's tag HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); @@ -331,9 +331,9 @@ public void testTagLocation() throws Exception { JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4)); // Also create the metadata and config - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieWriteConfig config = makeConfig(); - HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); // Let's tag HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); @@ -353,8 +353,8 @@ public void testTagLocation() throws Exception { HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Arrays.asList(record4), schema, null, true); // We do the tag again - metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - table = HoodieTable.getHoodieTable(metadata, config, jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + table = HoodieTable.getHoodieTable(metaClient, config, jsc); taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); @@ -401,9 +401,9 @@ public void testCheckExists() throws Exception { JavaRDD keysRDD = jsc.parallelize(Arrays.asList(key1, key2, key3, key4)); // Also create the metadata and config - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieWriteConfig config = makeConfig(); - HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); // Let's tag HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); @@ -424,8 +424,8 @@ public void testCheckExists() throws Exception { HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Arrays.asList(record4), schema, null, true); // We do the tag again - metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - table = HoodieTable.getHoodieTable(metadata, config, jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + table = HoodieTable.getHoodieTable(metaClient, config, jsc); taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, table); // Check results @@ -473,9 +473,9 @@ public void testBloomFilterFalseError() throws IOException, InterruptedException // We do the tag JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2)); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieWriteConfig config = makeConfig(); - HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); JavaRDD taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java index 9993cb47012f9..11669a3b6c1d9 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java @@ -40,7 +40,6 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.FileIOUtils; @@ -66,16 +65,17 @@ public TestHoodieGlobalBloomIndex() throws Exception { public void setUp() throws Exception { initSparkContexts("TestHoodieGlobalBloomIndex"); initTempFolderAndPath(); - HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath); // We have some records to be tagged (two different partitions) schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt")); schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); + initMetaClient(); } @After public void tearDown() throws Exception { cleanupSparkContexts(); cleanupTempFolderAndPath(); + cleanupMetaClient(); } @Test @@ -127,8 +127,8 @@ public void testLoadInvolvedFiles() throws IOException { // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up List partitions = Arrays.asList("2016/01/21", "2016/04/01"); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); // partitions will NOT be respected by this loadInvolvedFiles(...) call List> filesList = index.loadInvolvedFiles(partitions, jsc, table); // Still 0, as no valid commit @@ -139,7 +139,7 @@ public void testLoadInvolvedFiles() throws IOException { new File(basePath + "/.hoodie/20160401010101.commit").createNewFile(); new File(basePath + "/.hoodie/20150312101010.commit").createNewFile(); - table = HoodieTable.getHoodieTable(metadata, config, jsc); + table = HoodieTable.getHoodieTable(metaClient, config, jsc); filesList = index.loadInvolvedFiles(partitions, jsc, table); assertEquals(filesList.size(), 4); @@ -264,8 +264,8 @@ public void testTagLocation() throws Exception { HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Arrays.asList(record4), schema, null, false); // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); // Add some commits diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java index 5dee6f635c67a..9a9ddc09ed2b8 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java @@ -54,6 +54,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { private Configuration hadoopConf; + private HoodieTableMetaClient metaClient; @Before public void init() throws Exception { @@ -63,7 +64,7 @@ public void init() throws Exception { hadoopConf = dfs.getConf(); jsc.hadoopConfiguration().addResource(dfs.getConf()); dfs.mkdirs(new Path(basePath)); - HoodieTestUtils.init(hadoopConf, basePath); + metaClient = HoodieTestUtils.init(hadoopConf, basePath); } @After @@ -78,8 +79,8 @@ public void testArchiveEmptyDataset() throws IOException { HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable("test-trip-table").build(); - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, - new HoodieTableMetaClient(dfs.getConf(), cfg.getBasePath(), true)); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); boolean result = archiveLog.archiveIfRequired(jsc); assertTrue(result); } @@ -135,7 +136,7 @@ public void testArchiveDatasetWithArchival() throws IOException { new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "105"), dfs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf()); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); @@ -158,8 +159,8 @@ public void testArchiveDatasetWithArchival() throws IOException { // verify in-flight instants before archive verifyInflightInstants(metaClient, 3); - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, - new HoodieTableMetaClient(dfs.getConf(), basePath, true)); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); assertTrue(archiveLog.archiveIfRequired(jsc)); @@ -235,7 +236,7 @@ public void testArchiveDatasetWithNoArchival() throws IOException { .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable("test-trip-table").withCompactionConfig( HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build(); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); // Requested Compaction HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, @@ -302,7 +303,7 @@ public void testArchiveCommitSafety() throws IOException { .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable("test-trip-table").withCompactionConfig( HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build(); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf()); @@ -328,7 +329,7 @@ public void testArchiveCommitSavepointNoHole() throws IOException { .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable("test-trip-table").withCompactionConfig( HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build(); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf()); @@ -360,7 +361,7 @@ public void testArchiveCommitCompactionNoHole() throws IOException { .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable("test-trip-table").withCompactionConfig( HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build(); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf()); HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "101", dfs.getConf()); diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java index 8d9a6762d9de2..0a02b70197c14 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java @@ -52,6 +52,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { private Configuration hadoopConf; + private HoodieTableMetaClient metaClient; @Before public void setUp() throws Exception { @@ -62,7 +63,7 @@ public void setUp() throws Exception { initTempFolderAndPath(); hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); fs = FSUtils.getFs(basePath, hadoopConf); - HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); + metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); initTestDataGenerator(); } @@ -96,9 +97,7 @@ private HoodieWriteConfig.Builder getConfigBuilder() { @Test(expected = HoodieNotSupportedException.class) public void testCompactionOnCopyOnWriteFail() throws Exception { - HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - + metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc); String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime(); table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime)); @@ -106,8 +105,8 @@ public void testCompactionOnCopyOnWriteFail() throws Exception { @Test public void testCompactionEmpty() throws Exception { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieWriteConfig config = getConfig(); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); try (HoodieWriteClient writeClient = getWriteClient(config);) { @@ -136,7 +135,7 @@ public void testWriteStatusContentsAfterCompaction() throws Exception { List statuses = writeClient.insert(recordsRDD, newCommitTime).collect(); // Update all the 100 records - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); newCommitTime = "101"; @@ -153,7 +152,7 @@ public void testWriteStatusContentsAfterCompaction() throws Exception { updatedRecords); // Verify that all data file has one log file - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); table = HoodieTable.getHoodieTable(metaClient, config, jsc); for (String partitionPath : dataGen.getPartitionPaths()) { List groupedLogFiles = table.getRTFileSystemView().getLatestFileSlices(partitionPath) @@ -164,7 +163,7 @@ public void testWriteStatusContentsAfterCompaction() throws Exception { } // Do a compaction - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); table = HoodieTable.getHoodieTable(metaClient, config, jsc); String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime(); diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java index 288e04caa154f..68db6ca674b42 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java @@ -58,17 +58,17 @@ public void setUp() throws Exception { initSparkContexts("TestHoodieMergeHandle"); initTempFolderAndPath(); initFileSystem(); - initTableType(); initTestDataGenerator(); + initMetaClient(); } @After public void tearDown() throws Exception { - cleanupTableType(); cleanupFileSystem(); cleanupTestDataGenerator(); cleanupTempFolderAndPath(); cleanupSparkContexts(); + cleanupMetaClient(); } private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception { @@ -109,7 +109,7 @@ public void testUpsertsForMultipleRecordsInSameFile() throws Exception { assertNoWriteErrors(statuses); // verify that there is a commit - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants()); @@ -137,7 +137,7 @@ public void testUpsertsForMultipleRecordsInSameFile() throws Exception { assertNoWriteErrors(statuses); // verify that there are 2 commits - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); assertEquals("Expecting two commits.", 2, timeline.findInstantsAfter("000", Integer.MAX_VALUE) .countInstants()); @@ -161,7 +161,7 @@ public void testUpsertsForMultipleRecordsInSameFile() throws Exception { assertNoWriteErrors(statuses); // verify that there are now 3 commits - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); assertEquals("Expecting three commits.", 3, timeline.findInstantsAfter("000", Integer.MAX_VALUE) .countInstants()); @@ -259,7 +259,7 @@ public void testHoodieMergeHandleWriteStatMetrics() throws Exception { .map(status -> status.getStat().getNumInserts()).reduce((a, b) -> a + b).get(), 100); // Update all the 100 records - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); newCommitTime = "101"; diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java index 6439d75208a39..2c46fa9636607 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java @@ -73,7 +73,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { public void setUp() throws Exception { initSparkContexts("TestCopyOnWriteTable"); initTempFolderAndPath(); - initTableType(); + initMetaClient(); initTestDataGenerator(); initFileSystem(); } @@ -82,7 +82,7 @@ public void setUp() throws Exception { public void tearDown() throws Exception { cleanupSparkContexts(); cleanupTempFolderAndPath(); - cleanupTableType(); + cleanupMetaClient(); cleanupFileSystem(); cleanupTestDataGenerator(); } @@ -94,7 +94,7 @@ public void testMakeNewPath() throws Exception { String commitTime = HoodieTestUtils.makeNewCommitTime(); HoodieWriteConfig config = makeHoodieClientConfig(); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); Pair newPathWithWriteToken = jsc.parallelize(Arrays.asList(1)).map(x -> { @@ -127,7 +127,7 @@ public void testUpdateRecords() throws Exception { // Prepare the AvroParquetIO HoodieWriteConfig config = makeHoodieClientConfig(); String firstCommitTime = HoodieTestUtils.makeNewCommitTime(); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); String partitionPath = "/2016/01/31"; HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); @@ -203,7 +203,7 @@ public void testUpdateRecords() throws Exception { Thread.sleep(1000); String newCommitTime = HoodieTestUtils.makeNewCommitTime(); - metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); final HoodieCopyOnWriteTable newTable = new HoodieCopyOnWriteTable(config, jsc); List statuses = jsc.parallelize(Arrays.asList(1)).map(x -> { @@ -271,7 +271,7 @@ public void testMetadataAggregateFromWriteStatus() throws Exception { HoodieWriteConfig config = makeHoodieClientConfigBuilder().withWriteStatusClass(MetadataMergeWriteStatus.class) .build(); String firstCommitTime = HoodieTestUtils.makeNewCommitTime(); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); @@ -308,8 +308,8 @@ public void testMetadataAggregateFromWriteStatus() throws Exception { public void testInsertRecords() throws Exception { HoodieWriteConfig config = makeHoodieClientConfig(); String commitTime = HoodieTestUtils.makeNewCommitTime(); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); + metaClient = HoodieTableMetaClient.reload(metaClient); // Case 1: // 10 records for partition 1, 1 record for partition 2. @@ -362,7 +362,7 @@ public void testFileSizeUpsertRecords() throws Exception { HoodieStorageConfig.newBuilder().limitFileSize(64 * 1024).parquetBlockSize(64 * 1024).parquetPageSize(64 * 1024) .build()).build(); String commitTime = HoodieTestUtils.makeNewCommitTime(); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); List records = new ArrayList<>(); @@ -401,8 +401,7 @@ private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts HoodieClientTestUtils.fakeCommitFile(basePath, "001"); HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize); - - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{testPartitionPath}); @@ -476,7 +475,7 @@ public void testUpsertPartitionerWithSmallInsertHandling() throws Exception { public void testInsertUpsertWithHoodieAvroPayload() throws Exception { HoodieWriteConfig config = makeHoodieClientConfigBuilder().withStorageConfig( HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build(); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); final HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); String commitTime = "000"; // Perform inserts of 100 records to test CreateHandle and BufferedExecutor @@ -487,7 +486,7 @@ public void testInsertUpsertWithHoodieAvroPayload() throws Exception { WriteStatus writeStatus = ws.get(0).get(0); String fileId = writeStatus.getFileId(); - metadata.getFs().create(new Path(basePath + "/.hoodie/000.commit")).close(); + metaClient.getFs().create(new Path(basePath + "/.hoodie/000.commit")).close(); final HoodieCopyOnWriteTable table2 = new HoodieCopyOnWriteTable(config, jsc); final List updates = diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java index 833b39ae3d6e4..788c7839939c4 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java @@ -156,7 +156,7 @@ public void testSimpleInsertAndUpdate() throws Exception { statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); // Verify there are no errors assertNoWriteErrors(statuses); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = HoodieTableMetaClient.reload(metaClient); deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); assertTrue(deltaCommit.isPresent()); assertEquals("Latest Delta commit should be 004", "004", deltaCommit.get().getTimestamp()); @@ -173,7 +173,7 @@ public void testSimpleInsertAndUpdate() throws Exception { assertTrue(dataFilesToRead.findAny().isPresent()); // verify that there is a commit - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath(), true); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants(); assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants()); @@ -270,7 +270,7 @@ public void testSimpleInsertUpdateAndDelete() throws Exception { // Verify there are no errors assertNoWriteErrors(statuses); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = HoodieTableMetaClient.reload(metaClient); deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); assertTrue(deltaCommit.isPresent()); assertEquals("Latest Delta commit should be 004", "004", deltaCommit.get().getTimestamp()); @@ -335,7 +335,7 @@ public void testCOWToMORConvertedDatasetRollback() throws Exception { //rollback a COW commit when TableType is MOR client.rollback(newCommitTime); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, @@ -454,7 +454,7 @@ public void testRollbackWithDeltaAndCompactionCommit() throws Exception { Assert.assertEquals(Arrays.asList(allFiles).stream().filter(file -> file.getPath().getName() .contains(commitTime2)).collect(Collectors.toList()).size(), 0); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = HoodieTableMetaClient.reload(metaClient); hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); @@ -477,14 +477,14 @@ public void testRollbackWithDeltaAndCompactionCommit() throws Exception { // Verify there are no errors assertNoWriteErrors(statuses); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = HoodieTableMetaClient.reload(metaClient); String compactionInstantTime = thirdClient.scheduleCompaction(Option.empty()).get().toString(); JavaRDD ws = thirdClient.compact(compactionInstantTime); thirdClient.commitCompaction(compactionInstantTime, ws, Option.empty()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = HoodieTableMetaClient.reload(metaClient); hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); List dataFiles2 = roView.getLatestDataFiles().collect(Collectors.toList()); @@ -504,7 +504,7 @@ public void testRollbackWithDeltaAndCompactionCommit() throws Exception { thirdClient.rollback(compactedCommitTime); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = HoodieTableMetaClient.reload(metaClient); hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); @@ -603,7 +603,7 @@ public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { // Verify there are no errors assertNoWriteErrors(statuses); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = HoodieTableMetaClient.reload(metaClient); String compactionInstantTime = "004"; allCommits.add(compactionInstantTime); @@ -626,7 +626,7 @@ public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { // Verify there are no errors assertNoWriteErrors(statuses); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = HoodieTableMetaClient.reload(metaClient); compactionInstantTime = "006"; allCommits.add(compactionInstantTime); @@ -635,7 +635,7 @@ public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { client.commitCompaction(compactionInstantTime, ws, Option.empty()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = HoodieTableMetaClient.reload(metaClient); roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); final String compactedCommitTime = metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant() @@ -669,7 +669,7 @@ public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { // Rollback latest commit first client.restoreToInstant("000"); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = HoodieTableMetaClient.reload(metaClient); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); @@ -754,7 +754,7 @@ public void testUpsertPartitioner() throws Exception { // Verify there are no errors assertNoWriteErrors(statuses); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = HoodieTableMetaClient.reload(metaClient); deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); assertTrue(deltaCommit.isPresent()); assertEquals("Latest Delta commit should be 002", "002", deltaCommit.get().getTimestamp()); @@ -811,7 +811,7 @@ public void testLogFileCountsAfterCompaction() throws Exception { HoodieTestDataGenerator.avroSchemaWithMetadataFields, updatedRecords); // Verify that all data file has one log file - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); table = HoodieTable.getHoodieTable(metaClient, config, jsc); // In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state ((SyncableFileSystemView) (table.getRTFileSystemView())).reset(); @@ -833,7 +833,7 @@ public void testLogFileCountsAfterCompaction() throws Exception { JavaRDD result = writeClient.compact(compactionInstantTime); // Verify that recently written compacted data file has no log file - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); @@ -949,7 +949,7 @@ public void testInsertsGeneratedIntoLogFilesRollback() throws Exception { .copyToLocalFile(new Path(metaClient.getMetaPath(), fileName), new Path(file.getAbsolutePath())); writeClient.rollback(newCommitTime); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); RealtimeView tableRTFileSystemView = table.getRTFileSystemView(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 479db692e4b9b..1d2ac37cb2729 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -71,6 +71,7 @@ public class HoodieTableMetaClient implements Serializable { private String basePath; private transient HoodieWrapperFileSystem fs; private String metaPath; + private boolean loadActiveTimelineOnLoad; private SerializableConfiguration hadoopConf; private HoodieTableType tableType; private HoodieTableConfig tableConfig; @@ -104,6 +105,7 @@ public HoodieTableMetaClient(Configuration conf, String basePath, this.tableConfig = new HoodieTableConfig(fs, metaPath); this.tableType = tableConfig.getTableType(); log.info("Finished Loading Table of type " + tableType + " from " + basePath); + this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad; if (loadActiveTimelineOnLoad) { log.info("Loading Active commit timeline for " + basePath); getActiveTimeline(); @@ -118,6 +120,14 @@ public HoodieTableMetaClient(Configuration conf, String basePath, public HoodieTableMetaClient() { } + public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) { + return new HoodieTableMetaClient( + oldMetaClient.hadoopConf.get(), + oldMetaClient.basePath, + oldMetaClient.loadActiveTimelineOnLoad, + oldMetaClient.consistencyGuardConfig); + } + /** * This method is only used when this object is deserialized in a spark executor. *