diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index ba7c71b135684..b494df42b49fb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -111,6 +111,7 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) { LOG.info("Nothing to clean here. It is already clean"); return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build(); } + LOG.info("Earliest commit to retain for clean : " + (earliestInstant.isPresent() ? earliestInstant.get().getTimestamp() : "null")); LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy()); int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); LOG.info("Using cleanerParallelism: " + cleanerParallelism); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index be949fedb37e9..80aa7b31624b4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -509,7 +509,7 @@ public Pair> getDeletePaths(String partitionPath) { */ public Option getEarliestCommitToRetain() { return CleanerUtils.getEarliestCommitToRetain( - hoodieTable.getMetaClient().getActiveTimeline().getCommitsTimeline(), + hoodieTable.getMetaClient().getActiveTimeline().getCommitsAndCompactionTimeline(), config.getCleanerPolicy(), config.getCleanerCommitsRetained(), Instant.now(), diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index d1e7761369104..17a12dcc7ff2a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.HoodieTimelineArchiver; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -260,6 +261,97 @@ private void testInsertAndCleanFailedWritesByVersions( } } + /** + * Test earliest commit to retain should be earlier than first pending compaction in incremental cleaning scenarios. + * + * @throws IOException + */ + @Test + public void testEarliestInstantToRetainForPendingCompaction() throws IOException { + HoodieWriteConfig writeConfig = getConfigBuilder().withPath(basePath) + .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() + .withEnableBackupForRemoteFileSystemView(false) + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withAutoClean(false) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .retainCommits(1) + .build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withInlineCompaction(false) + .withMaxNumDeltaCommitsBeforeCompaction(1) + .compactionSmallFileSize(1024 * 1024 * 1024) + .build()) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .withAutoArchive(false) + .archiveCommitsWith(2,3) + .build()) + .withEmbeddedTimelineServerEnabled(false).build(); + + HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(context, writeConfig)) { + + final String partition1 = "2023/06/01"; + final String partition2 = "2023/06/02"; + String instantTime = ""; + String earliestInstantToRetain = ""; + + for (int idx = 0; idx < 3; ++idx) { + instantTime = HoodieActiveTimeline.createNewInstantTime(); + if (idx == 2) { + earliestInstantToRetain = instantTime; + } + List records = dataGen.generateInsertsForPartition(instantTime, 1, partition1); + client.startCommitWithTime(instantTime); + client.insert(jsc.parallelize(records, 1), instantTime).collect(); + } + + + instantTime = HoodieActiveTimeline.createNewInstantTime(); + HoodieTable table = HoodieSparkTable.create(writeConfig, context); + Option cleanPlan = table.scheduleCleaning(context, instantTime, Option.empty()); + assertEquals(cleanPlan.get().getFilePathsToBeDeletedPerPartition().get(partition1).size(), 1); + assertEquals(earliestInstantToRetain, cleanPlan.get().getEarliestInstantToRetain().getTimestamp(), + "clean until " + earliestInstantToRetain); + table.getMetaClient().reloadActiveTimeline(); + table.clean(context, instantTime); + + + instantTime = HoodieActiveTimeline.createNewInstantTime(); + List records = dataGen.generateInsertsForPartition(instantTime, 1, partition1); + client.startCommitWithTime(instantTime); + JavaRDD recordsRDD = jsc.parallelize(records, 1); + client.insert(recordsRDD, instantTime).collect(); + + + instantTime = HoodieActiveTimeline.createNewInstantTime(); + earliestInstantToRetain = instantTime; + List updatedRecords = dataGen.generateUpdates(instantTime, records); + JavaRDD updatedRecordsRDD = jsc.parallelize(updatedRecords, 1); + SparkRDDReadClient readClient = new SparkRDDReadClient(context, writeConfig); + JavaRDD updatedTaggedRecordsRDD = readClient.tagLocation(updatedRecordsRDD); + client.startCommitWithTime(instantTime); + client.upsertPreppedRecords(updatedTaggedRecordsRDD, instantTime).collect(); + + table.getMetaClient().reloadActiveTimeline(); + // pending compaction + String compactionInstantTime = client.scheduleCompaction(Option.empty()).get().toString(); + + for (int idx = 0; idx < 3; ++idx) { + instantTime = HoodieActiveTimeline.createNewInstantTime(); + records = dataGen.generateInsertsForPartition(instantTime, 1, partition2); + client.startCommitWithTime(instantTime); + client.insert(jsc.parallelize(records, 1), instantTime).collect(); + } + + // earliest commit to retain should be earlier than first pending compaction in incremental cleaning scenarios. + instantTime = HoodieActiveTimeline.createNewInstantTime(); + cleanPlan = table.scheduleCleaning(context, instantTime, Option.empty()); + assertEquals(earliestInstantToRetain,cleanPlan.get().getEarliestInstantToRetain().getTimestamp()); + } + } + /** * Tests no more than 1 clean is scheduled if hoodie.clean.allow.multiple config is set to false. */ @@ -777,16 +869,17 @@ public void testKeepLatestCommitsWithPendingCompactions(boolean isAsync) throws .withCleanConfig(HoodieCleanConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAsyncClean(isAsync).retainCommits(2).build()) .build(); + // Deletions: - // . FileId Base Logs Total Retained Commits - // FileId7 5 10 15 009, 011 - // FileId6 5 10 15 009 - // FileId5 3 6 9 005 - // FileId4 2 4 6 003 - // FileId3 1 2 3 001 - // FileId2 0 0 0 000 - // FileId1 0 0 0 000 - testPendingCompactions(config, 48, 18, false); + // . FileId Base Logs Total Retained_Commits Under_Compaction + // FileId7 1 2 3 001,003 false + // FileId6 1 2 3 001,003 false + // FileId5 1 2 3 001,003 true + // FileId4 1 2 3 001,003 true + // FileId3 1 2 3 001 true + // FileId2 0 0 0 000 true + // FileId1 0 0 0 000 false + testPendingCompactions(config, 15, 9, false); } /** @@ -801,15 +894,16 @@ public void testKeepLatestVersionsWithPendingCompactions(boolean retryFailure) t .withCleanConfig(HoodieCleanConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(2).build()) .build(); + // Deletions: - // . FileId Base Logs Total Retained Commits - // FileId7 5 10 15 009, 011 - // FileId6 4 8 12 007, 009 - // FileId5 2 4 6 003 005 - // FileId4 1 2 3 001, 003 - // FileId3 0 0 0 000, 001 - // FileId2 0 0 0 000 - // FileId1 0 0 0 000 + // . FileId Base Logs Total Retained_Commits Under_Compaction + // FileId7 5 10 15 009,013 false + // FileId6 4 8 12 007,009 false + // FileId5 2 4 6 003,005 true + // FileId4 1 2 3 001,003 true + // FileId3 0 0 0 000,001 true + // FileId2 0 0 0 000 true + // FileId1 0 0 0 000 false testPendingCompactions(config, 36, 9, retryFailure); } @@ -1005,23 +1099,24 @@ private void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDel HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context); final String partition = "2016/03/15"; + String timePrefix = "00000000000"; Map expFileIdToPendingCompaction = new HashMap() { { - put("fileId2", "004"); - put("fileId3", "006"); - put("fileId4", "008"); - put("fileId5", "010"); + put("fileId2", timePrefix + "004"); + put("fileId3", timePrefix + "006"); + put("fileId4", timePrefix + "008"); + put("fileId5", timePrefix + "010"); } }; Map fileIdToLatestInstantBeforeCompaction = new HashMap() { { - put("fileId1", "000"); - put("fileId2", "000"); - put("fileId3", "001"); - put("fileId4", "003"); - put("fileId5", "005"); - put("fileId6", "009"); - put("fileId7", "011"); + put("fileId1", timePrefix + "000"); + put("fileId2", timePrefix + "000"); + put("fileId3", timePrefix + "001"); + put("fileId4", timePrefix + "003"); + put("fileId5", timePrefix + "005"); + put("fileId6", timePrefix + "009"); + put("fileId7", timePrefix + "013"); } }; @@ -1047,60 +1142,60 @@ private void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDel Map> part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file1P1, file2P1, file3P1, file4P1, file5P1, file6P1, file7P1)); // all 7 fileIds - commitWithMdt("000", part1ToFileId, testTable, metadataWriter, true, true); + commitWithMdt(timePrefix + "000", part1ToFileId, testTable, metadataWriter, true, true); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file3P1, file4P1, file5P1, file6P1, file7P1)); // fileIds 3 to 7 - commitWithMdt("001", part1ToFileId, testTable, metadataWriter, true, true); + commitWithMdt(timePrefix + "001", part1ToFileId, testTable, metadataWriter, true, true); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file4P1, file5P1, file6P1, file7P1)); // fileIds 4 to 7 - commitWithMdt("003", part1ToFileId, testTable, metadataWriter, true, true); + commitWithMdt(timePrefix + "003", part1ToFileId, testTable, metadataWriter, true, true); // add compaction - testTable.addRequestedCompaction("004", new FileSlice(partition, "000", file2P1)); + testTable.addRequestedCompaction(timePrefix + "004", new FileSlice(partition, timePrefix + "000", file2P1)); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file2P1)); - commitWithMdt("005", part1ToFileId, testTable, metadataWriter, false, true); + commitWithMdt(timePrefix + "005", part1ToFileId, testTable, metadataWriter, false, true); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file5P1, file6P1, file7P1)); - commitWithMdt("0055", part1ToFileId, testTable, metadataWriter, true, true); + commitWithMdt(timePrefix + "0055", part1ToFileId, testTable, metadataWriter, true, true); - testTable.addRequestedCompaction("006", new FileSlice(partition, "001", file3P1)); + testTable.addRequestedCompaction(timePrefix + "006", new FileSlice(partition, timePrefix + "001", file3P1)); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file3P1)); - commitWithMdt("007", part1ToFileId, testTable, metadataWriter, false, true); + commitWithMdt(timePrefix + "007", part1ToFileId, testTable, metadataWriter, false, true); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file6P1, file7P1)); - commitWithMdt("0075", part1ToFileId, testTable, metadataWriter, true, true); + commitWithMdt(timePrefix + "0075", part1ToFileId, testTable, metadataWriter, true, true); - testTable.addRequestedCompaction("008", new FileSlice(partition, "003", file4P1)); + testTable.addRequestedCompaction(timePrefix + "008", new FileSlice(partition, timePrefix + "003", file4P1)); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file4P1)); - commitWithMdt("009", part1ToFileId, testTable, metadataWriter, false, true); + commitWithMdt(timePrefix + "009", part1ToFileId, testTable, metadataWriter, false, true); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file6P1, file7P1)); - commitWithMdt("0095", part1ToFileId, testTable, metadataWriter, true, true); + commitWithMdt(timePrefix + "0095", part1ToFileId, testTable, metadataWriter, true, true); - testTable.addRequestedCompaction("010", new FileSlice(partition, "005", file5P1)); + testTable.addRequestedCompaction(timePrefix + "010", new FileSlice(partition, timePrefix + "005", file5P1)); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file5P1)); - commitWithMdt("011", part1ToFileId, testTable, metadataWriter, false, true); + commitWithMdt(timePrefix + "011", part1ToFileId, testTable, metadataWriter, false, true); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file7P1)); - commitWithMdt("013", part1ToFileId, testTable, metadataWriter, true, true); + commitWithMdt(timePrefix + "013", part1ToFileId, testTable, metadataWriter, true, true); // Clean now metaClient = HoodieTableMetaClient.reload(metaClient); - List hoodieCleanStats = runCleaner(config, retryFailure); + List hoodieCleanStats = runCleaner(config, 14, true); // Test for safety final HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.reload(metaClient); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index 8c4a5cb377e45..6182bc4d4ebe7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -267,6 +267,13 @@ public HoodieTimeline getCommitsTimeline() { return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION)); } + /** + * Get all instants (commits, delta commits, replace, compaction) that produce new data or merge file, in the active timeline. + */ + public HoodieTimeline getCommitsAndCompactionTimeline() { + return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION, COMPACTION_ACTION)); + } + /** * Get all instants (commits, delta commits, compaction, clean, savepoint, rollback, replace commits, index) that result in actions, * in the active timeline.