diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java index a95cc53df329c..5d58aa9d2e498 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java @@ -25,9 +25,6 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import java.io.IOException; -import java.text.ParseException; -import java.time.Instant; -import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.Date; import java.util.List; @@ -53,16 +50,4 @@ public static String getTimeDaysAgo(int numberOfDays) { Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant()); return HoodieActiveTimeline.formatDate(date); } - - /** - * Add hours to specified time. If hours <0, this acts as remove hours. - * example, say compactionCommitTime: "20200202020000" - * a) hours: +1, returns 20200202030000 - * b) hours: -1, returns 20200202010000 - */ - public static String addHours(String compactionCommitTime, int hours) throws ParseException { - Instant instant = HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).toInstant(); - ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault()); - return HoodieActiveTimeline.formatDate(Date.from(commitDateTime.plusHours(hours).toInstant())); - } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index 65b36fcb62233..130d379f278d4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -83,6 +83,12 @@ public class HoodieCompactionConfig extends HoodieConfig { .withDocumentation("Number of commits to retain, without cleaning. This will be retained for num_of_commits * time_between_commits " + "(scheduled). This also directly translates into how much data retention the table supports for incremental queries."); + public static final ConfigProperty CLEANER_HOURS_RETAINED = ConfigProperty.key("hoodie.cleaner.hours.retained") + .defaultValue("24") + .withDocumentation("Number of hours for which commits need to be retained. This config provides a more flexible option as" + + "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group," + + " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned."); + public static final ConfigProperty CLEANER_POLICY = ConfigProperty .key("hoodie.cleaner.policy") .defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()) @@ -585,6 +591,11 @@ public Builder retainCommits(int commitsRetained) { return this; } + public Builder cleanerNumHoursRetained(int cleanerHoursRetained) { + compactionConfig.setValue(CLEANER_HOURS_RETAINED, String.valueOf(cleanerHoursRetained)); + return this; + } + public Builder archiveCommitsWith(int minToKeep, int maxToKeep) { compactionConfig.setValue(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep)); compactionConfig.setValue(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index e3efe418f1cbe..be07b1ef29c5f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1082,6 +1082,10 @@ public int getCleanerCommitsRetained() { return getInt(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED); } + public int getCleanerHoursRetained() { + return getInt(HoodieCompactionConfig.CLEANER_HOURS_RETAINED); + } + public int getMaxCommitsToKeep() { return getInt(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 27937af880c40..7e56d3456a0a4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; @@ -50,8 +51,12 @@ import java.io.IOException; import java.io.Serializable; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -123,6 +128,7 @@ public Stream getSavepointedDataFiles(String savepointTime) { public List getPartitionPathsToClean(Option earliestRetainedInstant) throws IOException { switch (config.getCleanerPolicy()) { case KEEP_LATEST_COMMITS: + case KEEP_LATEST_BY_HOURS: return getPartitionPathsForCleanByCommits(earliestRetainedInstant); case KEEP_LATEST_FILE_VERSIONS: return getPartitionPathsForFullCleaning(); @@ -251,6 +257,10 @@ private List getFilesToCleanKeepingLatestVersions(String partitio return deletePaths; } + private List getFilesToCleanKeepingLatestCommits(String partitionPath) { + return getFilesToCleanKeepingLatestCommits(partitionPath, config.getCleanerCommitsRetained(), HoodieCleaningPolicy.KEEP_LATEST_COMMITS); + } + /** * Selects the versions for file for cleaning, such that it *

@@ -265,8 +275,7 @@ private List getFilesToCleanKeepingLatestVersions(String partitio *

* This policy is the default. */ - private List getFilesToCleanKeepingLatestCommits(String partitionPath) { - int commitsRetained = config.getCleanerCommitsRetained(); + private List getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) { LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. "); List deletePaths = new ArrayList<>(); @@ -303,14 +312,24 @@ private List getFilesToCleanKeepingLatestCommits(String partition // do not clean up a savepoint data file continue; } - // Dont delete the latest commit and also the last commit before the earliest commit we - // are retaining - // The window of commit retain == max query run time. So a query could be running which - // still - // uses this file. - if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) { - // move on to the next file - continue; + + if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { + // Dont delete the latest commit and also the last commit before the earliest commit we + // are retaining + // The window of commit retain == max query run time. So a query could be running which + // still + // uses this file. + if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) { + // move on to the next file + continue; + } + } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { + // This block corresponds to KEEP_LATEST_BY_HOURS policy + // Do not delete the latest commit. + if (fileCommitTime.equals(lastVersion)) { + // move on to the next file + continue; + } } // Always keep the last commit @@ -334,6 +353,18 @@ private List getFilesToCleanKeepingLatestCommits(String partition } return deletePaths; } + + /** + * This method finds the files to be cleaned based on the number of hours. If {@code config.getCleanerHoursRetained()} is set to 5, + * all the files with commit time earlier than 5 hours will be removed. Also the latest file for any file group is retained. + * This policy gives much more flexibility to users for retaining data for running incremental queries as compared to + * KEEP_LATEST_COMMITS cleaning policy. The default number of hours is 5. + * @param partitionPath partition path to check + * @return list of files to clean + */ + private List getFilesToCleanKeepingLatestHours(String partitionPath) { + return getFilesToCleanKeepingLatestCommits(partitionPath, 0, HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS); + } private List getReplacedFilesEligibleToClean(List savepointedFiles, String partitionPath, Option earliestCommitToRetain) { final Stream replacedGroups; @@ -392,6 +423,8 @@ public List getDeletePaths(String partitionPath) { deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath); } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) { deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath); + } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { + deletePaths = getFilesToCleanKeepingLatestHours(partitionPath); } else { throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name()); } @@ -406,9 +439,16 @@ public List getDeletePaths(String partitionPath) { public Option getEarliestCommitToRetain() { Option earliestCommitToRetain = Option.empty(); int commitsRetained = config.getCleanerCommitsRetained(); + int hoursRetained = config.getCleanerHoursRetained(); if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS && commitTimeline.countInstants() > commitsRetained) { - earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained); + earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list + } else if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { + Instant instant = Instant.now(); + ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault()); + String earliestTimeToRetain = HoodieActiveTimeline.formatDate(Date.from(currentDateTime.minusHours(hoursRetained).toInstant())); + earliestCommitToRetain = Option.fromJavaOptional(commitTimeline.getInstants().filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(), + HoodieTimeline.GREATER_THAN_OR_EQUALS, earliestTimeToRetain)).findFirst()); } return earliestCommitToRetain; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index f1c83e188cd65..bec0b08edbac8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -569,7 +569,7 @@ private List runCleaner(HoodieWriteConfig config, int firstComm return runCleaner(config, false, firstCommitSequence); } - private List runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) throws IOException { + protected List runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) throws IOException { return runCleaner(config, simulateRetryFailure, 1); } @@ -1182,7 +1182,7 @@ private static void assertCleanMetadataPathEquals(Map expected, } } - private static Stream argumentsForTestKeepLatestCommits() { + protected static Stream argumentsForTestKeepLatestCommits() { return Stream.of( Arguments.of(false, false, false), Arguments.of(true, false, false), @@ -1328,7 +1328,7 @@ public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIn * @return Partition to BootstrapFileMapping Map * @throws IOException */ - private Map> generateBootstrapIndexAndSourceData(String... partitions) throws IOException { + protected Map> generateBootstrapIndexAndSourceData(String... partitions) throws IOException { // create bootstrap source data path java.nio.file.Path sourcePath = tempDir.resolve("data"); java.nio.file.Files.createDirectories(sourcePath); @@ -1680,7 +1680,7 @@ private Stream> convertPathToFileIdWithCommitTime(final Hoo return Stream.concat(stream1, stream2); } - private static HoodieCommitMetadata generateCommitMetadata( + protected static HoodieCommitMetadata generateCommitMetadata( String instantTime, Map> partitionToFilePaths) { HoodieCommitMetadata metadata = new HoodieCommitMetadata(); partitionToFilePaths.forEach((partitionPath, fileList) -> fileList.forEach(f -> { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java new file mode 100644 index 0000000000000..961523eb6b993 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.functional; + +import org.apache.hudi.common.HoodieCleanStat; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.BootstrapFileMapping; +import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.TestCleaner; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestCleanPlanExecutor extends TestCleaner { + + /** + * Tests cleaning service based on number of hours retained. + */ + @ParameterizedTest + @MethodSource("argumentsForTestKeepLatestCommits") + public void testKeepXHoursWithCleaning(boolean simulateFailureRetry, boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withIncrementalCleaningMode(enableIncrementalClean) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) + .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(2).build()) + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String p0 = "2020/01/01"; + String p1 = "2020/01/02"; + Map> bootstrapMapping = enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData(p0, p1) : null; + + String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p0).get(0).getFileId() + : UUID.randomUUID().toString(); + String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p1).get(0).getFileId() + : UUID.randomUUID().toString(); + Instant instant = Instant.now(); + ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault()); + int minutesForFirstCommit = 150; + String firstCommitTs = HoodieActiveTimeline.formatDate(Date.from(commitDateTime.minusMinutes(minutesForFirstCommit).toInstant())); + testTable.addInflightCommit(firstCommitTs).withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); + + HoodieCommitMetadata commitMetadata = generateCommitMetadata(firstCommitTs, + Collections.unmodifiableMap(new HashMap>() { + { + put(p0, CollectionUtils.createImmutableList(file1P0C0)); + put(p1, CollectionUtils.createImmutableList(file1P1C0)); + } + }) + ); + metaClient.getActiveTimeline().saveAsComplete( + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, firstCommitTs), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + + metaClient = HoodieTableMetaClient.reload(metaClient); + + List hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry); + assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files"); + assertTrue(testTable.baseFileExists(p0, firstCommitTs, file1P0C0)); + assertTrue(testTable.baseFileExists(p1, firstCommitTs, file1P1C0)); + + // make next commit, with 1 insert & 1 update per partition + int minutesForSecondCommit = 90; + String secondCommitTs = HoodieActiveTimeline.formatDate(Date.from(commitDateTime.minusMinutes(minutesForSecondCommit).toInstant())); + Map partitionAndFileId002 = testTable.addInflightCommit(secondCommitTs).getFileIdsWithBaseFilesInPartitions(p0, p1); + String file2P0C1 = partitionAndFileId002.get(p0); + String file2P1C1 = partitionAndFileId002.get(p1); + testTable.forCommit(secondCommitTs).withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); + commitMetadata = generateCommitMetadata(secondCommitTs, new HashMap>() { + { + put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1)); + put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1)); + } + }); + metaClient.getActiveTimeline().saveAsComplete( + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, secondCommitTs), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + List hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry); + assertEquals(2, hoodieCleanStatsTwo.size(), "Should clean one file each from both the partitions"); + assertTrue(testTable.baseFileExists(p0, secondCommitTs, file2P0C1)); + assertTrue(testTable.baseFileExists(p1, secondCommitTs, file2P1C1)); + assertTrue(testTable.baseFileExists(p0, secondCommitTs, file1P0C0)); + assertTrue(testTable.baseFileExists(p1, secondCommitTs, file1P1C0)); + assertFalse(testTable.baseFileExists(p0, firstCommitTs, file1P0C0)); + assertFalse(testTable.baseFileExists(p1, firstCommitTs, file1P1C0)); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCleaningPolicy.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCleaningPolicy.java index 647232fb7e3a9..58b9f7475a35f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCleaningPolicy.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCleaningPolicy.java @@ -22,5 +22,5 @@ * Hoodie cleaning policies. */ public enum HoodieCleaningPolicy { - KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_COMMITS; + KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_COMMITS, KEEP_LATEST_BY_HOURS; }