From f639e485bf1897cd0a11c29bc38a9b77e7eac388 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Tue, 21 Jun 2022 20:27:33 +0530 Subject: [PATCH] [HUDI-4291] Fix flaky TestCleanPlanExecutor#testKeepLatestFileVersions --- .../functional/TestCleanPlanExecutor.java | 172 ++++++++++++------ 1 file changed, 120 insertions(+), 52 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java index 90d0f8835dd14..bd015baec9dbb 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java @@ -47,7 +47,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -86,9 +85,7 @@ public void testInvalidCleaningTriggerStrategy() { .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2) .withCleaningTriggerStrategy("invalid_strategy").build()) .build(); - Exception e = assertThrows(IllegalArgumentException.class, () -> { - runCleaner(config, true); - }, "should fail when invalid trigger strategy is provided!"); + Exception e = assertThrows(IllegalArgumentException.class, () -> runCleaner(config, true), "should fail when invalid trigger strategy is provided!"); assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.table.action.clean.CleaningTriggerStrategy.invalid_strategy")); } @@ -272,36 +269,30 @@ public void testKeepLatestCommits( /** * Test Hudi COW Table Cleaner - Keep the latest file versions policy. */ - @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throws Exception { + @Test + public void testKeepLatestFileVersions() throws Exception { HoodieWriteConfig config = - HoodieWriteConfig.newBuilder().withPath(basePath) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) - .build(); + HoodieWriteConfig.newBuilder().withPath(basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) + .build(); HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context); HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter); final String p0 = "2020/01/01"; final String p1 = "2020/01/02"; - final Map> bootstrapMapping = enableBootstrapSourceClean - ? generateBootstrapIndexAndSourceData(p0, p1) : null; // make 1 commit, with 1 file per partition - final String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p0).get(0).getFileId() - : UUID.randomUUID().toString(); - final String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p1).get(0).getFileId() - : UUID.randomUUID().toString(); + final String file1P0C0 = UUID.randomUUID().toString(); + final String file1P1C0 = UUID.randomUUID().toString(); Map>> c1PartitionToFilesNameLengthMap = new HashMap<>(); c1PartitionToFilesNameLengthMap.put(p0, Collections.singletonList(Pair.of(file1P0C0, 100))); c1PartitionToFilesNameLengthMap.put(p1, Collections.singletonList(Pair.of(file1P1C0, 200))); testTable.doWriteOperation("00000000000001", WriteOperationType.INSERT, Arrays.asList(p0, p1), - c1PartitionToFilesNameLengthMap, false, false); + c1PartitionToFilesNameLengthMap, false, false); List hoodieCleanStatsOne = runCleanerWithInstantFormat(config, true); assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files"); @@ -315,58 +306,135 @@ public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throw c2PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 101), Pair.of(file2P0C1, 100))); c2PartitionToFilesNameLengthMap.put(p1, Arrays.asList(Pair.of(file1P1C0, 201), Pair.of(file2P1C1, 200))); testTable.doWriteOperation("00000000000002", WriteOperationType.UPSERT, Collections.emptyList(), - c2PartitionToFilesNameLengthMap, false, false); + c2PartitionToFilesNameLengthMap, false, false); // enableBootstrapSourceClean would delete the bootstrap base file at the same time List hoodieCleanStatsTwo = runCleaner(config, 1, true); HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, p0); - assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size() - + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 - : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); - - if (enableBootstrapSourceClean) { - HoodieFileStatus fstatus = - bootstrapMapping.get(p0).get(0).getBootstrapFileStatus(); - // This ensures full path is recorded in metadata. - assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()), - "Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles() - + " but did not contain " + fstatus.getPath().getUri()); - assertFalse(Files.exists(Paths.get(bootstrapMapping.get( - p0).get(0).getBootstrapFileStatus().getPath().getUri()))); - } + assertEquals(1, cleanStat.getSuccessDeleteFiles().size() + + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 + : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); cleanStat = getCleanStat(hoodieCleanStatsTwo, p1); assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); assertTrue(testTable.baseFileExists(p1, "00000000000002", file2P1C1)); assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); - assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size() - + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 - : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); + assertEquals(1, cleanStat.getSuccessDeleteFiles().size() + + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 + : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); - if (enableBootstrapSourceClean) { - HoodieFileStatus fstatus = - bootstrapMapping.get(p1).get(0).getBootstrapFileStatus(); - // This ensures full path is recorded in metadata. - assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()), - "Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles() - + " but did not contain " + fstatus.getPath().getUri()); - assertFalse(Files.exists(Paths.get(bootstrapMapping.get( - p1).get(0).getBootstrapFileStatus().getPath().getUri()))); - } + // make next commit, with 2 updates to existing files, and 1 insert + final String file3P0C2 = UUID.randomUUID().toString(); + Map>> c3PartitionToFilesNameLengthMap = new HashMap<>(); + c3PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 102), Pair.of(file2P0C1, 101), + Pair.of(file3P0C2, 100))); + testTable.doWriteOperation("00000000000003", WriteOperationType.UPSERT, Collections.emptyList(), + c3PartitionToFilesNameLengthMap, false, false); + + List hoodieCleanStatsThree = runCleaner(config, 3, true); + assertEquals(2, + getCleanStat(hoodieCleanStatsThree, p0) + .getSuccessDeleteFiles().size(), "Must clean two files"); + assertFalse(testTable.baseFileExists(p0, "00000000000002", file1P0C0)); + assertFalse(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2)); + + // No cleaning on partially written file, with no commit. + testTable.forCommit("00000000000004").withBaseFilesInPartition(p0, file3P0C2); + + List hoodieCleanStatsFour = runCleaner(config); + assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files"); + assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2)); + } + + @Test + public void testKeepLatestFileVersionsWithBootstrapFileClean() throws Exception { + HoodieWriteConfig config = + HoodieWriteConfig.newBuilder().withPath(basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withCleanBootstrapBaseFileEnabled(true) + .withCleanerParallelism(1) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) + .build(); + + HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context); + HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter); + + final String p0 = "2020/01/01"; + final String p1 = "2020/01/02"; + final Map> bootstrapMapping = generateBootstrapIndexAndSourceData(p0, p1); + + // make 1 commit, with 1 file per partition + final String file1P0C0 = bootstrapMapping.get(p0).get(0).getFileId(); + final String file1P1C0 = bootstrapMapping.get(p1).get(0).getFileId(); + + Map>> c1PartitionToFilesNameLengthMap = new HashMap<>(); + c1PartitionToFilesNameLengthMap.put(p0, Collections.singletonList(Pair.of(file1P0C0, 100))); + c1PartitionToFilesNameLengthMap.put(p1, Collections.singletonList(Pair.of(file1P1C0, 200))); + testTable.doWriteOperation("00000000000001", WriteOperationType.INSERT, Arrays.asList(p0, p1), + c1PartitionToFilesNameLengthMap, false, false); + + List hoodieCleanStatsOne = runCleanerWithInstantFormat(config, true); + assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files"); + assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); + + // make next commit, with 1 insert & 1 update per partition + final String file2P0C1 = UUID.randomUUID().toString(); + final String file2P1C1 = UUID.randomUUID().toString(); + Map>> c2PartitionToFilesNameLengthMap = new HashMap<>(); + c2PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 101), Pair.of(file2P0C1, 100))); + c2PartitionToFilesNameLengthMap.put(p1, Arrays.asList(Pair.of(file1P1C0, 201), Pair.of(file2P1C1, 200))); + testTable.doWriteOperation("00000000000002", WriteOperationType.UPSERT, Collections.emptyList(), + c2PartitionToFilesNameLengthMap, false, false); + + // should delete the bootstrap base file at the same time + List hoodieCleanStatsTwo = runCleaner(config, 1, true); + HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, p0); + assertEquals(2, cleanStat.getSuccessDeleteFiles().size() + + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 + : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); + + HoodieFileStatus fstatus = + bootstrapMapping.get(p0).get(0).getBootstrapFileStatus(); + // This ensures full path is recorded in metadata. + assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()), + "Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles() + + " but did not contain " + fstatus.getPath().getUri()); + assertFalse(Files.exists(Paths.get(bootstrapMapping.get( + p0).get(0).getBootstrapFileStatus().getPath().getUri()))); + + cleanStat = getCleanStat(hoodieCleanStatsTwo, p1); + assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.baseFileExists(p1, "00000000000002", file2P1C1)); + assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); + assertEquals(2, cleanStat.getSuccessDeleteFiles().size() + + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 + : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); + + fstatus = bootstrapMapping.get(p1).get(0).getBootstrapFileStatus(); + // This ensures full path is recorded in metadata. + assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()), + "Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles() + + " but did not contain " + fstatus.getPath().getUri()); + assertFalse(Files.exists(Paths.get(bootstrapMapping.get( + p1).get(0).getBootstrapFileStatus().getPath().getUri()))); // make next commit, with 2 updates to existing files, and 1 insert final String file3P0C2 = UUID.randomUUID().toString(); Map>> c3PartitionToFilesNameLengthMap = new HashMap<>(); c3PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 102), Pair.of(file2P0C1, 101), - Pair.of(file3P0C2, 100))); + Pair.of(file3P0C2, 100))); testTable.doWriteOperation("00000000000003", WriteOperationType.UPSERT, Collections.emptyList(), - c3PartitionToFilesNameLengthMap, false, false); + c3PartitionToFilesNameLengthMap, false, false); List hoodieCleanStatsThree = runCleaner(config, 3, true); assertEquals(2, - getCleanStat(hoodieCleanStatsThree, p0) - .getSuccessDeleteFiles().size(), "Must clean two files"); + getCleanStat(hoodieCleanStatsThree, p0) + .getSuccessDeleteFiles().size(), "Must clean two files"); assertFalse(testTable.baseFileExists(p0, "00000000000002", file1P0C0)); assertFalse(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2));