Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ protected HoodieCleanMetadata runClean(HoodieTable<T> table, String cleanInstant
// Create the metadata and save it
HoodieCleanMetadata metadata =
AvroUtils.convertCleanMetadata(cleanInstant.getTimestamp(), durationInMs, cleanStats);
logger.info("Cleaned " + metadata.getTotalFilesDeleted() + " files");
logger.info("Cleaned " + metadata.getTotalFilesDeleted() + " files. Earliest Retained :" + metadata.getEarliestCommitToRetain());
metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted());

table.getActiveTimeline().transitionCleanInflightToComplete(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
public static final String INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = "hoodie.compact.inline.max" + ".delta.commits";
public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP = "hoodie.cleaner.fileversions" + ".retained";
public static final String CLEANER_COMMITS_RETAINED_PROP = "hoodie.cleaner.commits.retained";
public static final String CLEANER_INCREMENTAL_MODE = "hoodie.cleaner.incremental.mode";
public static final String MAX_COMMITS_TO_KEEP_PROP = "hoodie.keep.max.commits";
public static final String MIN_COMMITS_TO_KEEP_PROP = "hoodie.keep.min.commits";
public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = "hoodie.commits.archival.batch";
Expand Down Expand Up @@ -92,6 +93,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
private static final String DEFAULT_CLEANER_POLICY = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name();
private static final String DEFAULT_AUTO_CLEAN = "true";
private static final String DEFAULT_INLINE_COMPACT = "false";
private static final String DEFAULT_INCREMENTAL_CLEANER = "false";
private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = "1";
private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = "3";
private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10";
Expand Down Expand Up @@ -136,6 +138,11 @@ public Builder withAutoClean(Boolean autoClean) {
return this;
}

public Builder withIncrementalCleaningMode(Boolean incrementalCleaningMode) {
props.setProperty(CLEANER_INCREMENTAL_MODE, String.valueOf(incrementalCleaningMode));
return this;
}

public Builder withInlineCompaction(Boolean inlineCompaction) {
props.setProperty(INLINE_COMPACT_PROP, String.valueOf(inlineCompaction));
return this;
Expand Down Expand Up @@ -235,6 +242,8 @@ public Builder withCommitsArchivalBatchSize(int batchSize) {
public HoodieCompactionConfig build() {
HoodieCompactionConfig config = new HoodieCompactionConfig(props);
setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP, DEFAULT_AUTO_CLEAN);
setDefaultOnCondition(props, !props.containsKey(CLEANER_INCREMENTAL_MODE), CLEANER_INCREMENTAL_MODE,
DEFAULT_INCREMENTAL_CLEANER);
setDefaultOnCondition(props, !props.containsKey(INLINE_COMPACT_PROP), INLINE_COMPACT_PROP,
DEFAULT_INLINE_COMPACT);
setDefaultOnCondition(props, !props.containsKey(INLINE_COMPACT_NUM_DELTA_COMMITS_PROP),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ public boolean isAutoClean() {
return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.AUTO_CLEAN_PROP));
}

public boolean incrementalCleanerModeEnabled() {
return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.CLEANER_INCREMENTAL_MODE));
}

public boolean isInlineCompaction() {
return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_PROP));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
Expand All @@ -37,9 +38,12 @@
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.SyncableFileSystemView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -76,6 +80,46 @@ public HoodieCleanHelper(HoodieTable<T> hoodieTable, HoodieWriteConfig config) {
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}

/**
* Returns list of partitions where clean operations needs to be performed
* @param newInstantToRetain New instant to be retained after this cleanup operation
* @return list of partitions to scan for cleaning
* @throws IOException when underlying file-system throws this exception
*/
public List<String> getPartitionPathsToClean(Option<HoodieInstant> newInstantToRetain) throws IOException {
if (config.incrementalCleanerModeEnabled() && newInstantToRetain.isPresent()
&& (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) {
Option<HoodieInstant> lastClean =
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastClean.isPresent()) {
HoodieCleanMetadata cleanMetadata = AvroUtils
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
if ((cleanMetadata.getEarliestCommitToRetain() != null)
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
logger.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ ". New Instant to retain : " + newInstantToRetain);
return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> {
return HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER);
}).flatMap(instant -> {
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
return commitMetadata.getPartitionToWriteStats().keySet().stream();
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}).distinct().collect(Collectors.toList());
}
}
}
// Otherwise go to brute force mode of scanning all partitions
return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(),
hoodieTable.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning());
}

/**
* Selects the older versions of files for cleaning, such that it bounds the number of versions of each file. This
* policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,11 @@ public Iterator<List<WriteStatus>> handleInsertPartition(String commitTime, Inte
*/
public HoodieCleanerPlan scheduleClean(JavaSparkContext jsc) {
try {
FileSystem fs = getMetaClient().getFs();
HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config);
Option<HoodieInstant> earliestInstant = cleaner.getEarliestCommitToRetain();

List<String> partitionsToClean = cleaner.getPartitionPathsToClean(earliestInstant);

List<String> partitionsToClean =
FSUtils.getAllPartitionPaths(fs, getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning());
if (partitionsToClean.isEmpty()) {
logger.info("Nothing to clean here. It is already clean");
return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
Expand All @@ -290,8 +291,6 @@ public HoodieCleanerPlan scheduleClean(JavaSparkContext jsc) {
"Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
logger.info("Using cleanerParallelism: " + cleanerParallelism);
HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config);
Option<HoodieInstant> earliestInstant = cleaner.getEarliestCommitToRetain();

Map<String, List<String>> cleanOps = jsc.parallelize(partitionsToClean, cleanerParallelism)
.map(partitionPathToClean -> Pair.of(partitionPathToClean, cleaner.getDeletePaths(partitionPathToClean)))
Expand Down
105 changes: 90 additions & 15 deletions hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -597,7 +600,7 @@ public void testKeepLatestFileVersionsMOR() throws IOException {
*/
@Test
public void testKeepLatestCommits() throws IOException {
testKeepLatestCommits(false);
testKeepLatestCommits(false, false);
}

/**
Expand All @@ -606,15 +609,24 @@ public void testKeepLatestCommits() throws IOException {
*/
@Test
public void testKeepLatestCommitsWithFailureRetry() throws IOException {
testKeepLatestCommits(true);
testKeepLatestCommits(true, false);
}

/**
* Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files
*/
@Test
public void testKeepLatestCommitsIncrMode() throws IOException {
testKeepLatestCommits(false, true);
}

/**
* Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files.
*/
private void testKeepLatestCommits(boolean simulateFailureRetry) throws IOException {
private void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean) throws IOException {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withIncrementalCleaningMode(enableIncrementalClean)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
.build();

Expand All @@ -626,6 +638,16 @@ private void testKeepLatestCommits(boolean simulateFailureRetry) throws IOExcept
String file1P1C0 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000");

HoodieCommitMetadata commitMetadata = generateCommitMetadata(new ImmutableMap.Builder()
.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
new ImmutableList.Builder<>().add(file1P0C0).build())
.put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
new ImmutableList.Builder<>().add(file1P1C0).build())
.build());
metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));

metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);

Expand All @@ -647,12 +669,24 @@ private void testKeepLatestCommits(boolean simulateFailureRetry) throws IOExcept
table = HoodieTable.getHoodieTable(metaClient, config, jsc);

String file2P0C1 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
HoodieTestUtils
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
String file2P1C1 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001"); // insert
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update

HoodieTestUtils
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001"); // insert
HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update
HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update
commitMetadata = generateCommitMetadata(new ImmutableMap.Builder()
.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
new ImmutableList.Builder<>().add(file1P0C0).add(file2P0C1).build())
.put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
new ImmutableList.Builder<>().add(file1P1C0).add(file2P1C1).build())
.build());
metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry);
assertEquals("Must not clean any files", 0,
getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
Expand All @@ -674,11 +708,21 @@ private void testKeepLatestCommits(boolean simulateFailureRetry) throws IOExcept
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);

HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1); // update
HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update
HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1); // update
String file3P0C2 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002");

commitMetadata = generateCommitMetadata(new ImmutableMap.Builder()
.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
new ImmutableList.Builder<>().add(file1P0C0).add(file2P0C1).add(file3P0C2).build())
.build());
metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "002"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));

List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry);
assertEquals("Must not clean any file. We have to keep 1 version before the latest commit time to keep", 0,
getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
Expand All @@ -692,10 +736,19 @@ private void testKeepLatestCommits(boolean simulateFailureRetry) throws IOExcept
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);

HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file1P0C0); // update
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file2P0C1); // update
HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file1P0C0); // update
HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file2P0C1); // update
String file4P0C3 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003");
commitMetadata = generateCommitMetadata(new ImmutableMap.Builder()
.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
new ImmutableList.Builder<>().add(file1P0C0).add(file2P0C1).add(file4P0C3).build())
.build());
metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "003"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));

List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config, simulateFailureRetry);
assertEquals("Must not clean one old file", 1,
Expand All @@ -718,11 +771,19 @@ private void testKeepLatestCommits(boolean simulateFailureRetry) throws IOExcept
file4P0C3));

// No cleaning on partially written file, with no commit.
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "004", file3P0C2); // update
HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "004", file3P0C2); // update
commitMetadata = generateCommitMetadata(new ImmutableMap.Builder()
.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
new ImmutableList.Builder<>().add(file3P0C2).build())
.build());
metaClient.getActiveTimeline().saveToInflight(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "004"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry);
HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
assertEquals("Must not clean any files", 0,
getCleanStat(hoodieCleanStatsFive, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
.size());
cleanStat != null ? cleanStat.getSuccessDeleteFiles().size() : 0);
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
file1P0C0));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
Expand Down Expand Up @@ -992,4 +1053,18 @@ private Stream<Pair<String, String>> convertPathToFileIdWithCommitTime(final Hoo
});
return Stream.concat(stream1, stream2);
}

private static HoodieCommitMetadata generateCommitMetadata(Map<String, List<String>> partitionToFilePaths) {
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
partitionToFilePaths.entrySet().forEach(e -> {
e.getValue().forEach(f -> {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setPartitionPath(e.getKey());
writeStat.setPath(f);
writeStat.setFileId(f);
metadata.addWriteStat(e.getKey(), writeStat);
});
});
return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime,
* @return Cleaner state corresponding to partition path
*/
HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) {
return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().get();
return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
}

/**
Expand Down
Loading