Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ public void testUnscheduleCompactionPlan() throws Exception {
int numEntriesPerInstant = 10;
CompactionTestUtils.setupAndValidateCompactionOperations(metaClient, false, numEntriesPerInstant,
numEntriesPerInstant, numEntriesPerInstant, numEntriesPerInstant);
// THere are delta-commits after compaction instant
// There are delta-commits after compaction instant
validateUnSchedulePlan(client, "000", "001", numEntriesPerInstant, 2 * numEntriesPerInstant);
// THere are delta-commits after compaction instant
// There are delta-commits after compaction instant
validateUnSchedulePlan(client, "002", "003", numEntriesPerInstant, 2 * numEntriesPerInstant);
// THere are no delta-commits after compaction instant
// There are no delta-commits after compaction instant
validateUnSchedulePlan(client, "004", "005", numEntriesPerInstant, 0);
// THere are no delta-commits after compaction instant
// There are no delta-commits after compaction instant
validateUnSchedulePlan(client, "006", "007", numEntriesPerInstant, 0);
}

Expand All @@ -106,13 +106,13 @@ public void testUnscheduleCompactionFileId() throws Exception {
}).map(instantWithPlan -> instantWithPlan.getRight().getOperations().stream()
.map(op -> Pair.of(instantWithPlan.getLeft(), CompactionOperation.convertFromAvroRecordInstance(op)))
.findFirst().get()).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
// THere are delta-commits after compaction instant
// There are delta-commits after compaction instant
validateUnScheduleFileId(client, "000", "001", instantsWithOp.get("001"), 2);
// THere are delta-commits after compaction instant
// There are delta-commits after compaction instant
validateUnScheduleFileId(client, "002", "003", instantsWithOp.get("003"), 2);
// THere are no delta-commits after compaction instant
// There are no delta-commits after compaction instant
validateUnScheduleFileId(client, "004", "005", instantsWithOp.get("005"), 0);
// THere are no delta-commits after compaction instant
// There are no delta-commits after compaction instant
validateUnScheduleFileId(client, "006", "007", instantsWithOp.get("007"), 0);
}

Expand All @@ -121,13 +121,13 @@ public void testRepairCompactionPlan() throws Exception {
int numEntriesPerInstant = 10;
CompactionTestUtils.setupAndValidateCompactionOperations(metaClient, false, numEntriesPerInstant,
numEntriesPerInstant, numEntriesPerInstant, numEntriesPerInstant);
// THere are delta-commits after compaction instant
// There are delta-commits after compaction instant
validateRepair("000", "001", numEntriesPerInstant, 2 * numEntriesPerInstant);
// THere are delta-commits after compaction instant
// There are delta-commits after compaction instant
validateRepair("002", "003", numEntriesPerInstant, 2 * numEntriesPerInstant);
// THere are no delta-commits after compaction instant
// There are no delta-commits after compaction instant
validateRepair("004", "005", numEntriesPerInstant, 0);
// THere are no delta-commits after compaction instant
// There are no delta-commits after compaction instant
validateRepair("006", "007", numEntriesPerInstant, 0);
}

Expand Down Expand Up @@ -163,15 +163,15 @@ private void validateRepair(String ingestionInstant, String compactionInstant, i
expRenameFiles.forEach((key, value) -> LOG.info("Key :" + key + " renamed to " + value + " rolled back to "
+ renameFilesFromUndo.get(key)));

assertEquals(expRenameFiles, renameFilesFromUndo, "Undo must completely rollback renames");
assertEquals(expRenameFiles, renameFilesFromUndo, "Undo must completely rollback renamed files");
// Now expect validation to succeed
result = client.validateCompactionPlan(metaClient, compactionInstant, 1);
assertTrue(result.stream().allMatch(OperationResult::isSuccess), "Expect no failures in validation");
assertEquals(expNumRepairs, undoFiles.size(), "Expected Num Repairs");
}

/**
* Enssure compaction plan is valid.
* Ensure compaction plan is valid.
*
* @param compactionInstant Compaction Instant
*/
Expand Down Expand Up @@ -199,8 +199,8 @@ private void validateRenameFiles(List<Pair<HoodieLogFile, HoodieLogFile>> rename
renameFiles.forEach(lfPair -> {
HoodieLogFile oldLogFile = lfPair.getLeft();
HoodieLogFile newLogFile = lfPair.getValue();
assertEquals(ingestionInstant, newLogFile.getBaseCommitTime(), "Base Commit time is expected");
assertEquals(compactionInstant, oldLogFile.getBaseCommitTime(), "Base Commit time is expected");
assertEquals(ingestionInstant, newLogFile.getBaseCommitTime(), "Base Commit time of ingestion instant is expected");
assertEquals(compactionInstant, oldLogFile.getBaseCommitTime(), "Base Commit time of compaction instant is expected");
assertEquals(oldLogFile.getFileId(), newLogFile.getFileId(), "File Id is expected");
HoodieLogFile lastLogFileBeforeCompaction =
fsView.getLatestMergedFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], ingestionInstant)
Expand Down Expand Up @@ -273,7 +273,7 @@ private List<Pair<HoodieLogFile, HoodieLogFile>> validateUnSchedulePlan(Compacti
metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
final HoodieTableFileSystemView newFsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
// Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files
// Expect each file-slice whose base-commit is same as compaction commit to contain no new Log files
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true)
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant))
.forEach(fs -> {
Expand All @@ -291,7 +291,7 @@ private List<Pair<HoodieLogFile, HoodieLogFile>> validateUnSchedulePlan(Compacti
assertEquals(fileIdToCountsBeforeRenaming, fileIdToCountsAfterRenaming,
"Each File Id has same number of log-files");
assertEquals(numEntriesPerInstant, fileIdToCountsAfterRenaming.size(), "Not Empty");
assertEquals(expNumRenames, renameFiles.size(), "Expected number of renames");
assertEquals(expNumRenames, renameFiles.size(), "Expected number of renamed files");
return renameFiles;
}

Expand Down Expand Up @@ -354,6 +354,6 @@ private void validateUnScheduleFileId(CompactionAdminClient client, String inges
assertEquals(fileIdToCountsBeforeRenaming, fileIdToCountsAfterRenaming,
"Each File Id has same number of log-files");
assertEquals(1, fileIdToCountsAfterRenaming.size(), "Not Empty");
assertEquals(expNumRenames, renameFiles.size(), "Expected number of renames");
assertEquals(expNumRenames, renameFiles.size(), "Expected number of renamed files");
}
}