Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
Expand Down Expand Up @@ -86,9 +85,7 @@ public void testInvalidCleaningTriggerStrategy() {
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2)
.withCleaningTriggerStrategy("invalid_strategy").build())
.build();
Exception e = assertThrows(IllegalArgumentException.class, () -> {
runCleaner(config, true);
}, "should fail when invalid trigger strategy is provided!");
Exception e = assertThrows(IllegalArgumentException.class, () -> runCleaner(config, true), "should fail when invalid trigger strategy is provided!");
assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.table.action.clean.CleaningTriggerStrategy.invalid_strategy"));
}

Expand Down Expand Up @@ -272,36 +269,30 @@ public void testKeepLatestCommits(
/**
* Test Hudi COW Table Cleaner - Keep the latest file versions policy.
*/
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throws Exception {
@Test
public void testKeepLatestFileVersions() throws Exception {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
.build();
HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
.build();

HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter);

final String p0 = "2020/01/01";
final String p1 = "2020/01/02";
final Map<String, List<BootstrapFileMapping>> bootstrapMapping = enableBootstrapSourceClean
? generateBootstrapIndexAndSourceData(p0, p1) : null;

// make 1 commit, with 1 file per partition
final String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p0).get(0).getFileId()
: UUID.randomUUID().toString();
final String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p1).get(0).getFileId()
: UUID.randomUUID().toString();
final String file1P0C0 = UUID.randomUUID().toString();
final String file1P1C0 = UUID.randomUUID().toString();

Map<String, List<Pair<String, Integer>>> c1PartitionToFilesNameLengthMap = new HashMap<>();
c1PartitionToFilesNameLengthMap.put(p0, Collections.singletonList(Pair.of(file1P0C0, 100)));
c1PartitionToFilesNameLengthMap.put(p1, Collections.singletonList(Pair.of(file1P1C0, 200)));
testTable.doWriteOperation("00000000000001", WriteOperationType.INSERT, Arrays.asList(p0, p1),
c1PartitionToFilesNameLengthMap, false, false);
c1PartitionToFilesNameLengthMap, false, false);

List<HoodieCleanStat> hoodieCleanStatsOne = runCleanerWithInstantFormat(config, true);
assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files");
Expand All @@ -315,58 +306,135 @@ public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throw
c2PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 101), Pair.of(file2P0C1, 100)));
c2PartitionToFilesNameLengthMap.put(p1, Arrays.asList(Pair.of(file1P1C0, 201), Pair.of(file2P1C1, 200)));
testTable.doWriteOperation("00000000000002", WriteOperationType.UPSERT, Collections.emptyList(),
c2PartitionToFilesNameLengthMap, false, false);
c2PartitionToFilesNameLengthMap, false, false);

// enableBootstrapSourceClean would delete the bootstrap base file at the same time
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, 1, true);
HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, p0);
assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size()
+ (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");

if (enableBootstrapSourceClean) {
HoodieFileStatus fstatus =
bootstrapMapping.get(p0).get(0).getBootstrapFileStatus();
// This ensures full path is recorded in metadata.
assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
"Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles()
+ " but did not contain " + fstatus.getPath().getUri());
assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
p0).get(0).getBootstrapFileStatus().getPath().getUri())));
}
assertEquals(1, cleanStat.getSuccessDeleteFiles().size()
+ (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");

cleanStat = getCleanStat(hoodieCleanStatsTwo, p1);
assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.baseFileExists(p1, "00000000000002", file2P1C1));
assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size()
+ (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");
assertEquals(1, cleanStat.getSuccessDeleteFiles().size()
+ (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");

if (enableBootstrapSourceClean) {
HoodieFileStatus fstatus =
bootstrapMapping.get(p1).get(0).getBootstrapFileStatus();
// This ensures full path is recorded in metadata.
assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
"Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles()
+ " but did not contain " + fstatus.getPath().getUri());
assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
p1).get(0).getBootstrapFileStatus().getPath().getUri())));
}
// make next commit, with 2 updates to existing files, and 1 insert
final String file3P0C2 = UUID.randomUUID().toString();
Map<String, List<Pair<String, Integer>>> c3PartitionToFilesNameLengthMap = new HashMap<>();
c3PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 102), Pair.of(file2P0C1, 101),
Pair.of(file3P0C2, 100)));
testTable.doWriteOperation("00000000000003", WriteOperationType.UPSERT, Collections.emptyList(),
c3PartitionToFilesNameLengthMap, false, false);

List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, 3, true);
assertEquals(2,
getCleanStat(hoodieCleanStatsThree, p0)
.getSuccessDeleteFiles().size(), "Must clean two files");
assertFalse(testTable.baseFileExists(p0, "00000000000002", file1P0C0));
assertFalse(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2));

// No cleaning on partially written file, with no commit.
testTable.forCommit("00000000000004").withBaseFilesInPartition(p0, file3P0C2);

List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files");
assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2));
}

@Test
public void testKeepLatestFileVersionsWithBootstrapFileClean() throws Exception {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanBootstrapBaseFileEnabled(true)
.withCleanerParallelism(1)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
.build();

HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter);

final String p0 = "2020/01/01";
final String p1 = "2020/01/02";
final Map<String, List<BootstrapFileMapping>> bootstrapMapping = generateBootstrapIndexAndSourceData(p0, p1);

// make 1 commit, with 1 file per partition
final String file1P0C0 = bootstrapMapping.get(p0).get(0).getFileId();
final String file1P1C0 = bootstrapMapping.get(p1).get(0).getFileId();

Map<String, List<Pair<String, Integer>>> c1PartitionToFilesNameLengthMap = new HashMap<>();
c1PartitionToFilesNameLengthMap.put(p0, Collections.singletonList(Pair.of(file1P0C0, 100)));
c1PartitionToFilesNameLengthMap.put(p1, Collections.singletonList(Pair.of(file1P1C0, 200)));
testTable.doWriteOperation("00000000000001", WriteOperationType.INSERT, Arrays.asList(p0, p1),
c1PartitionToFilesNameLengthMap, false, false);

List<HoodieCleanStat> hoodieCleanStatsOne = runCleanerWithInstantFormat(config, true);
assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files");
assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));

// make next commit, with 1 insert & 1 update per partition
final String file2P0C1 = UUID.randomUUID().toString();
final String file2P1C1 = UUID.randomUUID().toString();
Map<String, List<Pair<String, Integer>>> c2PartitionToFilesNameLengthMap = new HashMap<>();
c2PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 101), Pair.of(file2P0C1, 100)));
c2PartitionToFilesNameLengthMap.put(p1, Arrays.asList(Pair.of(file1P1C0, 201), Pair.of(file2P1C1, 200)));
testTable.doWriteOperation("00000000000002", WriteOperationType.UPSERT, Collections.emptyList(),
c2PartitionToFilesNameLengthMap, false, false);

// should delete the bootstrap base file at the same time
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, 1, true);
HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, p0);
assertEquals(2, cleanStat.getSuccessDeleteFiles().size()
+ (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");

HoodieFileStatus fstatus =
bootstrapMapping.get(p0).get(0).getBootstrapFileStatus();
// This ensures full path is recorded in metadata.
assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
"Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles()
+ " but did not contain " + fstatus.getPath().getUri());
assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
p0).get(0).getBootstrapFileStatus().getPath().getUri())));

cleanStat = getCleanStat(hoodieCleanStatsTwo, p1);
assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.baseFileExists(p1, "00000000000002", file2P1C1));
assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
assertEquals(2, cleanStat.getSuccessDeleteFiles().size()
+ (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");

fstatus = bootstrapMapping.get(p1).get(0).getBootstrapFileStatus();
// This ensures full path is recorded in metadata.
assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
"Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles()
+ " but did not contain " + fstatus.getPath().getUri());
assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
p1).get(0).getBootstrapFileStatus().getPath().getUri())));

// make next commit, with 2 updates to existing files, and 1 insert
final String file3P0C2 = UUID.randomUUID().toString();
Map<String, List<Pair<String, Integer>>> c3PartitionToFilesNameLengthMap = new HashMap<>();
c3PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 102), Pair.of(file2P0C1, 101),
Pair.of(file3P0C2, 100)));
Pair.of(file3P0C2, 100)));
testTable.doWriteOperation("00000000000003", WriteOperationType.UPSERT, Collections.emptyList(),
c3PartitionToFilesNameLengthMap, false, false);
c3PartitionToFilesNameLengthMap, false, false);

List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, 3, true);
assertEquals(2,
getCleanStat(hoodieCleanStatsThree, p0)
.getSuccessDeleteFiles().size(), "Must clean two files");
getCleanStat(hoodieCleanStatsThree, p0)
.getSuccessDeleteFiles().size(), "Must clean two files");
assertFalse(testTable.baseFileExists(p0, "00000000000002", file1P0C0));
assertFalse(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2));
Expand Down