diff --git a/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java index fcfbd736e1380..97da1fef9820d 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java @@ -48,7 +48,6 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; -import org.apache.hudi.common.util.AvroUtils; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -106,17 +105,15 @@ public void testRollbackForInflightCompaction() throws Exception { HoodieInstant pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); - assertTrue("Pending Compaction instant has expected instant time", - pendingCompactionInstant.getTimestamp().equals(compactionInstantTime)); - assertTrue("Pending Compaction instant has expected state", - pendingCompactionInstant.getState().equals(State.REQUESTED)); + assertEquals("Pending Compaction instant has expected instant time", pendingCompactionInstant.getTimestamp(), + compactionInstantTime); + assertEquals("Pending Compaction instant has expected state", pendingCompactionInstant.getState(), State.REQUESTED); - moveCompactionFromRequestedToInflight(compactionInstantTime, client, cfg); + moveCompactionFromRequestedToInflight(compactionInstantTime, cfg); // Reload and rollback inflight compaction metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); - hoodieTable.rollback(jsc, compactionInstantTime, false); client.rollbackInflightCompaction( new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable); @@ -137,11 +134,6 @@ public void testRollbackForInflightCompaction() throws Exception { } } - private Path getInstantPath(HoodieTableMetaClient metaClient, String timestamp, String action, State state) { - HoodieInstant instant = new HoodieInstant(state, action, timestamp); - return new Path(metaClient.getMetaPath(), instant.getFileName()); - } - @Test public void testRollbackInflightIngestionWithPendingCompaction() throws Exception { // Rollback inflight ingestion when there is pending compaction @@ -169,12 +161,11 @@ public void testRollbackInflightIngestionWithPendingCompaction() throws Exceptio metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieInstant pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); - assertTrue("Pending Compaction instant has expected instant time", - pendingCompactionInstant.getTimestamp().equals(compactionInstantTime)); + assertEquals("Pending Compaction instant has expected instant time", pendingCompactionInstant.getTimestamp(), + compactionInstantTime); HoodieInstant inflightInstant = metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get(); - assertTrue("inflight instant has expected instant time", - inflightInstant.getTimestamp().equals(inflightInstantTime)); + assertEquals("inflight instant has expected instant time", inflightInstant.getTimestamp(), inflightInstantTime); // This should rollback client.startCommitWithTime(nextInflightInstantTime); @@ -182,14 +173,13 @@ public void testRollbackInflightIngestionWithPendingCompaction() throws Exceptio // Validate metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); inflightInstant = metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get(); - assertTrue("inflight instant has expected instant time", - inflightInstant.getTimestamp().equals(nextInflightInstantTime)); - assertTrue("Expect only one inflight instant", - metaClient.getActiveTimeline().filterInflightsExcludingCompaction().getInstants().count() == 1); + assertEquals("inflight instant has expected instant time", inflightInstant.getTimestamp(), nextInflightInstantTime); + assertEquals("Expect only one inflight instant", 1, metaClient.getActiveTimeline() + .filterInflightsExcludingCompaction().getInstants().count()); // Expect pending Compaction to be present pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); - assertTrue("Pending Compaction instant has expected instant time", - pendingCompactionInstant.getTimestamp().equals(compactionInstantTime)); + assertEquals("Pending Compaction instant has expected instant time", pendingCompactionInstant.getTimestamp(), + compactionInstantTime); } } @@ -215,7 +205,7 @@ public void testInflightCompaction() throws Exception { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); scheduleCompaction(compactionInstantTime, client, cfg); - moveCompactionFromRequestedToInflight(compactionInstantTime, client, cfg); + moveCompactionFromRequestedToInflight(compactionInstantTime, cfg); // Complete ingestions runNextDeltaCommits(client, Arrays.asList(thirdInstantTime, fourthInstantTime), records, cfg, false, @@ -243,13 +233,11 @@ public void testScheduleIngestionBeforePendingCompaction() throws Exception { new ArrayList<>()); // Schedule compaction but do not run them - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); scheduleCompaction(compactionInstantTime, client, cfg); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieInstant pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); - assertTrue("Pending Compaction instant has expected instant time", - pendingCompactionInstant.getTimestamp().equals(compactionInstantTime)); + assertEquals("Pending Compaction instant has expected instant time", pendingCompactionInstant.getTimestamp(), compactionInstantTime); boolean gotException = false; try { @@ -285,8 +273,7 @@ public void testScheduleCompactionAfterPendingIngestion() throws Exception { metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieInstant inflightInstant = metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get(); - assertTrue("inflight instant has expected instant time", - inflightInstant.getTimestamp().equals(inflightInstantTime)); + assertEquals("inflight instant has expected instant time", inflightInstant.getTimestamp(), inflightInstantTime); boolean gotException = false; try { @@ -312,10 +299,9 @@ public void testScheduleCompactionWithOlderOrSameTimestamp() throws Exception { int numRecs = 2000; List records = dataGen.generateInserts(firstInstantTime, numRecs); - records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, + runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); boolean gotException = false; try { // Schedule compaction but do not run them @@ -327,10 +313,9 @@ public void testScheduleCompactionWithOlderOrSameTimestamp() throws Exception { // Schedule with timestamp same as that of committed instant gotException = false; - String dupCompactionInstantTime = secondInstantTime; try { // Schedule compaction but do not run them - scheduleCompaction(dupCompactionInstantTime, client, cfg); + scheduleCompaction(secondInstantTime, client, cfg); } catch (IllegalArgumentException iex) { gotException = true; } @@ -341,7 +326,7 @@ public void testScheduleCompactionWithOlderOrSameTimestamp() throws Exception { gotException = false; try { // Schedule compaction with the same times as a pending compaction - scheduleCompaction(dupCompactionInstantTime, client, cfg); + scheduleCompaction(secondInstantTime, client, cfg); } catch (IllegalArgumentException iex) { gotException = true; } @@ -352,7 +337,7 @@ public void testScheduleCompactionWithOlderOrSameTimestamp() throws Exception { public void testCompactionAfterTwoDeltaCommits() throws Exception { // No Delta Commits after compaction request HoodieWriteConfig cfg = getConfig(true); - try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) { + try (HoodieWriteClient client = getHoodieWriteClient(cfg, true)) { String firstInstantTime = "001"; String secondInstantTime = "004"; @@ -360,7 +345,7 @@ public void testCompactionAfterTwoDeltaCommits() throws Exception { int numRecs = 2000; List records = dataGen.generateInserts(firstInstantTime, numRecs); - records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, + runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); @@ -403,15 +388,14 @@ public void testInterleavedCompaction() throws Exception { private void validateDeltaCommit(String latestDeltaCommit, final Map> fgIdToCompactionOperation, - HoodieWriteConfig cfg) throws IOException { + HoodieWriteConfig cfg) { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTable table = getHoodieTable(metaClient, cfg); - List fileSliceList = getCurrentLatestFileSlices(table, cfg); + List fileSliceList = getCurrentLatestFileSlices(table); fileSliceList.forEach(fileSlice -> { Pair opPair = fgIdToCompactionOperation.get(fileSlice.getFileGroupId()); if (opPair != null) { - assertTrue("Expect baseInstant to match compaction Instant", - fileSlice.getBaseInstantTime().equals(opPair.getKey())); + assertEquals("Expect baseInstant to match compaction Instant", fileSlice.getBaseInstantTime(), opPair.getKey()); assertTrue("Expect atleast one log file to be present where the latest delta commit was written", fileSlice.getLogFiles().count() > 0); assertFalse("Expect no data-file to be present", fileSlice.getDataFile().isPresent()); @@ -467,12 +451,9 @@ private List runNextDeltaCommits(HoodieWriteClient client, List in.getTimestamp().equals(compactionInstantTime)).findAny().get(); @@ -497,19 +478,19 @@ private void executeCompaction(String compactionInstantTime, HoodieWriteClient c HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException { client.compact(compactionInstantTime); - List fileSliceList = getCurrentLatestFileSlices(table, cfg); + List fileSliceList = getCurrentLatestFileSlices(table); assertTrue("Ensure latest file-slices are not empty", fileSliceList.stream().findAny().isPresent()); assertFalse("Verify all file-slices have base-instant same as compaction instant", fileSliceList.stream() - .filter(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime)).findAny().isPresent()); + .anyMatch(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime))); assertFalse("Verify all file-slices have data-files", - fileSliceList.stream().filter(fs -> !fs.getDataFile().isPresent()).findAny().isPresent()); + fileSliceList.stream().anyMatch(fs -> !fs.getDataFile().isPresent())); if (hasDeltaCommitAfterPendingCompaction) { assertFalse("Verify all file-slices have atleast one log-file", - fileSliceList.stream().filter(fs -> fs.getLogFiles().count() == 0).findAny().isPresent()); + fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() == 0)); } else { assertFalse("Verify all file-slices have no log-files", - fileSliceList.stream().filter(fs -> fs.getLogFiles().count() > 0).findAny().isPresent()); + fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() > 0)); } // verify that there is a commit @@ -552,16 +533,14 @@ private List getCurrentLatestDataFiles(HoodieTable table, Hoodie FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath()); HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles); - List dataFilesToRead = view.getLatestDataFiles().collect(Collectors.toList()); - return dataFilesToRead; + return view.getLatestDataFiles().collect(Collectors.toList()); } - private List getCurrentLatestFileSlices(HoodieTable table, HoodieWriteConfig cfg) throws IOException { + private List getCurrentLatestFileSlices(HoodieTable table) { HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(), table.getMetaClient().getActiveTimeline().reload().getCommitsAndCompactionTimeline()); - List fileSliceList = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS).stream() - .flatMap(partition -> view.getLatestFileSlices(partition)).collect(Collectors.toList()); - return fileSliceList; + return Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS) + .flatMap(view::getLatestFileSlices).collect(Collectors.toList()); } protected HoodieTableType getTableType() {