Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 34 additions & 55 deletions hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
Expand Down Expand Up @@ -106,17 +105,15 @@ public void testRollbackForInflightCompaction() throws Exception {

HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertTrue("Pending Compaction instant has expected instant time",
pendingCompactionInstant.getTimestamp().equals(compactionInstantTime));
assertTrue("Pending Compaction instant has expected state",
pendingCompactionInstant.getState().equals(State.REQUESTED));
assertEquals("Pending Compaction instant has expected instant time", pendingCompactionInstant.getTimestamp(),
compactionInstantTime);
assertEquals("Pending Compaction instant has expected state", pendingCompactionInstant.getState(), State.REQUESTED);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertEquals() is the best fit for these assertions.


moveCompactionFromRequestedToInflight(compactionInstantTime, client, cfg);
moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);

// Reload and rollback inflight compaction
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
hoodieTable.rollback(jsc, compactionInstantTime, false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this call is redundant, hoodieTable.rollback(jsc, compactionInstantTime, false) gets called as the first line of code in client.rollbackInflightCompaction(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable);


client.rollbackInflightCompaction(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable);
Expand All @@ -137,11 +134,6 @@ public void testRollbackForInflightCompaction() throws Exception {
}
}

private Path getInstantPath(HoodieTableMetaClient metaClient, String timestamp, String action, State state) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not getting used anywhere

HoodieInstant instant = new HoodieInstant(state, action, timestamp);
return new Path(metaClient.getMetaPath(), instant.getFileName());
}

@Test
public void testRollbackInflightIngestionWithPendingCompaction() throws Exception {
// Rollback inflight ingestion when there is pending compaction
Expand Down Expand Up @@ -169,27 +161,25 @@ public void testRollbackInflightIngestionWithPendingCompaction() throws Exceptio
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertTrue("Pending Compaction instant has expected instant time",
pendingCompactionInstant.getTimestamp().equals(compactionInstantTime));
assertEquals("Pending Compaction instant has expected instant time", pendingCompactionInstant.getTimestamp(),
compactionInstantTime);
HoodieInstant inflightInstant =
metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get();
assertTrue("inflight instant has expected instant time",
inflightInstant.getTimestamp().equals(inflightInstantTime));
assertEquals("inflight instant has expected instant time", inflightInstant.getTimestamp(), inflightInstantTime);

// This should rollback
client.startCommitWithTime(nextInflightInstantTime);

// Validate
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
inflightInstant = metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get();
assertTrue("inflight instant has expected instant time",
inflightInstant.getTimestamp().equals(nextInflightInstantTime));
assertTrue("Expect only one inflight instant",
metaClient.getActiveTimeline().filterInflightsExcludingCompaction().getInstants().count() == 1);
assertEquals("inflight instant has expected instant time", inflightInstant.getTimestamp(), nextInflightInstantTime);
assertEquals("Expect only one inflight instant", 1, metaClient.getActiveTimeline()
.filterInflightsExcludingCompaction().getInstants().count());
// Expect pending Compaction to be present
pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertTrue("Pending Compaction instant has expected instant time",
pendingCompactionInstant.getTimestamp().equals(compactionInstantTime));
assertEquals("Pending Compaction instant has expected instant time", pendingCompactionInstant.getTimestamp(),
compactionInstantTime);
}
}

Expand All @@ -215,7 +205,7 @@ public void testInflightCompaction() throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleCompaction(compactionInstantTime, client, cfg);
moveCompactionFromRequestedToInflight(compactionInstantTime, client, cfg);
moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);

// Complete ingestions
runNextDeltaCommits(client, Arrays.asList(thirdInstantTime, fourthInstantTime), records, cfg, false,
Expand Down Expand Up @@ -243,13 +233,11 @@ public void testScheduleIngestionBeforePendingCompaction() throws Exception {
new ArrayList<>());

// Schedule compaction but do not run them
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
scheduleCompaction(compactionInstantTime, client, cfg);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertTrue("Pending Compaction instant has expected instant time",
pendingCompactionInstant.getTimestamp().equals(compactionInstantTime));
assertEquals("Pending Compaction instant has expected instant time", pendingCompactionInstant.getTimestamp(), compactionInstantTime);

boolean gotException = false;
try {
Expand Down Expand Up @@ -285,8 +273,7 @@ public void testScheduleCompactionAfterPendingIngestion() throws Exception {
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieInstant inflightInstant =
metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get();
assertTrue("inflight instant has expected instant time",
inflightInstant.getTimestamp().equals(inflightInstantTime));
assertEquals("inflight instant has expected instant time", inflightInstant.getTimestamp(), inflightInstantTime);

boolean gotException = false;
try {
Expand All @@ -312,10 +299,9 @@ public void testScheduleCompactionWithOlderOrSameTimestamp() throws Exception {
int numRecs = 2000;

List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assigning the value to records is redundant, it never gets used.

runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());

HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not getting used anywhere.

boolean gotException = false;
try {
// Schedule compaction but do not run them
Expand All @@ -327,10 +313,9 @@ public void testScheduleCompactionWithOlderOrSameTimestamp() throws Exception {

// Schedule with timestamp same as that of committed instant
gotException = false;
String dupCompactionInstantTime = secondInstantTime;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this variable is redundant.

try {
// Schedule compaction but do not run them
scheduleCompaction(dupCompactionInstantTime, client, cfg);
scheduleCompaction(secondInstantTime, client, cfg);
} catch (IllegalArgumentException iex) {
gotException = true;
}
Expand All @@ -341,7 +326,7 @@ public void testScheduleCompactionWithOlderOrSameTimestamp() throws Exception {
gotException = false;
try {
// Schedule compaction with the same times as a pending compaction
scheduleCompaction(dupCompactionInstantTime, client, cfg);
scheduleCompaction(secondInstantTime, client, cfg);
} catch (IllegalArgumentException iex) {
gotException = true;
}
Expand All @@ -352,15 +337,15 @@ public void testScheduleCompactionWithOlderOrSameTimestamp() throws Exception {
public void testCompactionAfterTwoDeltaCommits() throws Exception {
// No Delta Commits after compaction request
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true)) {

String firstInstantTime = "001";
String secondInstantTime = "004";
String compactionInstantTime = "005";
int numRecs = 2000;

List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assigning the value to records is unnecessary.

runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());

HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
Expand Down Expand Up @@ -403,15 +388,14 @@ public void testInterleavedCompaction() throws Exception {

private void validateDeltaCommit(String latestDeltaCommit,
final Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation,
HoodieWriteConfig cfg) throws IOException {
HoodieWriteConfig cfg) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable table = getHoodieTable(metaClient, cfg);
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table, cfg);
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
fileSliceList.forEach(fileSlice -> {
Pair<String, HoodieCompactionOperation> opPair = fgIdToCompactionOperation.get(fileSlice.getFileGroupId());
if (opPair != null) {
assertTrue("Expect baseInstant to match compaction Instant",
fileSlice.getBaseInstantTime().equals(opPair.getKey()));
assertEquals("Expect baseInstant to match compaction Instant", fileSlice.getBaseInstantTime(), opPair.getKey());
assertTrue("Expect atleast one log file to be present where the latest delta commit was written",
fileSlice.getLogFiles().count() > 0);
assertFalse("Expect no data-file to be present", fileSlice.getDataFile().isPresent());
Expand Down Expand Up @@ -467,12 +451,9 @@ private List<HoodieRecord> runNextDeltaCommits(HoodieWriteClient client, List<St
return records;
}

private void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteClient client,
HoodieWriteConfig cfg) throws IOException {
private void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteConfig cfg) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
HoodieCompactionPlan workload = AvroUtils
.deserializeCompactionPlan(metaClient.getActiveTimeline().getInstantAuxiliaryDetails(compactionInstant).get());
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant);
HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants()
.filter(in -> in.getTimestamp().equals(compactionInstantTime)).findAny().get();
Expand All @@ -497,19 +478,19 @@ private void executeCompaction(String compactionInstantTime, HoodieWriteClient c
HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException {

client.compact(compactionInstantTime);
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table, cfg);
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
assertTrue("Ensure latest file-slices are not empty", fileSliceList.stream().findAny().isPresent());
assertFalse("Verify all file-slices have base-instant same as compaction instant", fileSliceList.stream()
.filter(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime)).findAny().isPresent());
.anyMatch(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime)));
assertFalse("Verify all file-slices have data-files",
fileSliceList.stream().filter(fs -> !fs.getDataFile().isPresent()).findAny().isPresent());
fileSliceList.stream().anyMatch(fs -> !fs.getDataFile().isPresent()));

if (hasDeltaCommitAfterPendingCompaction) {
assertFalse("Verify all file-slices have atleast one log-file",
fileSliceList.stream().filter(fs -> fs.getLogFiles().count() == 0).findAny().isPresent());
fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() == 0));
} else {
assertFalse("Verify all file-slices have no log-files",
fileSliceList.stream().filter(fs -> fs.getLogFiles().count() > 0).findAny().isPresent());
fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() > 0));
}

// verify that there is a commit
Expand Down Expand Up @@ -552,16 +533,14 @@ private List<HoodieDataFile> getCurrentLatestDataFiles(HoodieTable table, Hoodie
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
HoodieTableFileSystemView view =
new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
List<HoodieDataFile> dataFilesToRead = view.getLatestDataFiles().collect(Collectors.toList());
return dataFilesToRead;
return view.getLatestDataFiles().collect(Collectors.toList());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created inline variable.

}

private List<FileSlice> getCurrentLatestFileSlices(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
private List<FileSlice> getCurrentLatestFileSlices(HoodieTable table) {
HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(),
table.getMetaClient().getActiveTimeline().reload().getCommitsAndCompactionTimeline());
List<FileSlice> fileSliceList = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS).stream()
.flatMap(partition -> view.getLatestFileSlices(partition)).collect(Collectors.toList());
return fileSliceList;
return Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS)
.flatMap(view::getLatestFileSlices).collect(Collectors.toList());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}

protected HoodieTableType getTableType() {
Expand Down