diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index d1d0e6726173b..3b61d49727290 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -106,6 +106,12 @@ public class HoodieCompactionConfig extends HoodieConfig { .withDocumentation("Only if the log file size is greater than the threshold in bytes," + " the file group will be compacted."); + public static final ConfigProperty COMPACTION_LOG_FILE_NUM_THRESHOLD = ConfigProperty + .key("hoodie.compaction.logfile.num.threshold") + .defaultValue(0L) + .withDocumentation("Only if the log file num is greater than the threshold," + + " the file group will be compacted."); + public static final ConfigProperty COMPACTION_STRATEGY = ConfigProperty .key("hoodie.compaction.strategy") .defaultValue(LogFileSizeBasedCompactionStrategy.class.getName()) @@ -381,6 +387,11 @@ public Builder withLogFileSizeThresholdBasedCompaction(long logFileSizeThreshold return this; } + public Builder withCompactionLogFileNumThreshold(int logFileNumThreshold) { + compactionConfig.setValue(COMPACTION_LOG_FILE_NUM_THRESHOLD, String.valueOf(logFileNumThreshold)); + return this; + } + public Builder withPreserveCommitMetadata(boolean preserveCommitMetadata) { compactionConfig.setValue(PRESERVE_COMMIT_METADATA, String.valueOf(preserveCommitMetadata)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 90a468368f1ae..78c8d677523a2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -199,10 +199,10 @@ public class HoodieWriteConfig extends HoodieConfig { + "before writing records to the table."); public static final ConfigProperty BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS = ConfigProperty - .key("hoodie.bulkinsert.user.defined.partitioner.sort.columns") - .noDefaultValue() - .withDocumentation("Columns to sort the data by when use org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner as user defined partitioner during bulk_insert. " - + "For example 'column1,column2'"); + .key("hoodie.bulkinsert.user.defined.partitioner.sort.columns") + .noDefaultValue() + .withDocumentation("Columns to sort the data by when use org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner as user defined partitioner during bulk_insert. " + + "For example 'column1,column2'"); public static final ConfigProperty BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME = ConfigProperty .key("hoodie.bulkinsert.user.defined.partitioner.class") @@ -1275,6 +1275,10 @@ public Long getCompactionLogFileSizeThreshold() { return getLong(HoodieCompactionConfig.COMPACTION_LOG_FILE_SIZE_THRESHOLD); } + public Long getCompactionLogFileNumThreshold() { + return getLong(HoodieCompactionConfig.COMPACTION_LOG_FILE_NUM_THRESHOLD); + } + public Boolean getCompactionLazyBlockReadEnabled() { return getBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java new file mode 100644 index 0000000000000..6f79b684d0a46 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.compact.strategy; + +import org.apache.hudi.avro.model.HoodieCompactionOperation; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.config.HoodieWriteConfig; + +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +/** + * LogFileLengthBasedCompactionStrategy orders the compactions based on the total log files num, + * filters the file group which log files length is greater than the threshold and limits the compactions within a configured IO bound. + */ +public class LogFileNumBasedCompactionStrategy extends BoundedIOCompactionStrategy + implements Comparator { + + @Override + public List orderAndFilter(HoodieWriteConfig writeConfig, List operations, List pendingCompactionPlans) { + Long numThreshold = writeConfig.getCompactionLogFileNumThreshold(); + List filterOperator = operations.stream() + .filter(e -> e.getDeltaFilePaths().size() >= numThreshold) + .sorted(this).collect(Collectors.toList()); + return super.orderAndFilter(writeConfig, filterOperator, pendingCompactionPlans); + } + + @Override + public int compare(HoodieCompactionOperation hco1, HoodieCompactionOperation hco2) { + return hco2.getDeltaFilePaths().size() - hco1.getDeltaFilePaths().size(); + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java index 0c7190092e730..319d6ea031eb1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java @@ -49,8 +49,8 @@ public class TestHoodieCompactionStrategy { private static final long MB = 1024 * 1024L; - private String[] partitionPaths = {"2017/01/01", "2017/01/02", "2017/01/03"}; private static final Random RANDOM = new Random(); + private String[] partitionPaths = {"2017/01/01", "2017/01/02", "2017/01/03"}; @Test public void testUnBounded() { @@ -76,7 +76,7 @@ public void testBoundedIOSimple() { sizesMap.put(90 * MB, Collections.singletonList(1024 * MB)); BoundedIOCompactionStrategy strategy = new BoundedIOCompactionStrategy(); HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( - HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build()) + HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build()) .build(); List operations = createCompactionOperations(writeConfig, sizesMap); List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); @@ -99,8 +99,8 @@ public void testLogFileSizeCompactionSimple() { sizesMap.put(90 * MB, Collections.singletonList(1024 * MB)); LogFileSizeBasedCompactionStrategy strategy = new LogFileSizeBasedCompactionStrategy(); HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( - HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1205) - .withLogFileSizeThresholdBasedCompaction(100 * 1024 * 1024).build()) + HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1205) + .withLogFileSizeThresholdBasedCompaction(100 * 1024 * 1024).build()) .build(); List operations = createCompactionOperations(writeConfig, sizesMap); List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); @@ -123,7 +123,7 @@ public void testDayBasedCompactionSimple() { sizesMap.put(100 * MB, Collections.singletonList(MB)); sizesMap.put(90 * MB, Collections.singletonList(1024 * MB)); - Map keyToPartitionMap = Collections.unmodifiableMap(new HashMap() { + Map keyToPartitionMap = Collections.unmodifiableMap(new HashMap() { { put(120 * MB, partitionPaths[2]); put(110 * MB, partitionPaths[2]); @@ -169,7 +169,7 @@ public void testBoundedPartitionAwareCompactionSimple() { String currentDayPlus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(1)); String currentDayPlus5 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(5)); - Map keyToPartitionMap = Collections.unmodifiableMap(new HashMap() { + Map keyToPartitionMap = Collections.unmodifiableMap(new HashMap() { { put(120 * MB, currentDay); put(110 * MB, currentDayMinus1); @@ -218,7 +218,7 @@ public void testUnboundedPartitionAwareCompactionSimple() { String currentDayPlus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(1)); String currentDayPlus5 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(5)); - Map keyToPartitionMap = Collections.unmodifiableMap(new HashMap() { + Map keyToPartitionMap = Collections.unmodifiableMap(new HashMap() { { put(120 * MB, currentDay); put(110 * MB, currentDayMinus1); @@ -243,8 +243,44 @@ public void testUnboundedPartitionAwareCompactionSimple() { "BoundedPartitionAwareCompactionStrategy should have resulted in 1 compaction"); } + @Test + public void testLogFileLengthBasedCompactionStrategy() { + Map> sizesMap = new HashMap<>(); + sizesMap.put(120 * MB, Arrays.asList(60 * MB, 10 * MB, 80 * MB)); + sizesMap.put(110 * MB, new ArrayList<>()); + sizesMap.put(100 * MB, Collections.singletonList(2048 * MB)); + sizesMap.put(90 * MB, Arrays.asList(512 * MB, 512 * MB)); + LogFileNumBasedCompactionStrategy strategy = new LogFileNumBasedCompactionStrategy(); + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( + HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1024) + .withCompactionLogFileNumThreshold(2).build()) + .build(); + List operations = createCompactionOperations(writeConfig, sizesMap); + List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); + + assertTrue(returned.size() < operations.size(), + "LogFileLengthBasedCompactionStrategy should have resulted in fewer compactions"); + assertEquals(2, returned.size(), "LogFileLengthBasedCompactionStrategy should have resulted in 2 compaction"); + + // Delte log File length + Integer allFileLength = returned.stream().map(s -> s.getDeltaFilePaths().size()) + .reduce(Integer::sum).orElse(0); + + assertEquals(5, allFileLength); + assertEquals(3, returned.get(0).getDeltaFilePaths().size()); + assertEquals(2, returned.get(1).getDeltaFilePaths().size()); + // Total size of all the log files + Long returnedSize = returned.stream().map(s -> s.getMetrics().get(BoundedIOCompactionStrategy.TOTAL_IO_MB)) + .map(Double::longValue).reduce(Long::sum).orElse(0L); + // TOTAL_IO_MB: ( 120 + 90 ) * 2 + 521 + 521 + 60 + 10 + 80 + assertEquals(1594, (long) returnedSize, + "Should chose the first 2 compactions which should result in a total IO of 1594 MB"); + + + } + private List createCompactionOperations(HoodieWriteConfig config, - Map> sizesMap) { + Map> sizesMap) { Map keyToPartitionMap = sizesMap.keySet().stream() .map(e -> Pair.of(e, partitionPaths[RANDOM.nextInt(partitionPaths.length - 1)])) .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); @@ -252,7 +288,7 @@ private List createCompactionOperations(HoodieWriteCo } private List createCompactionOperations(HoodieWriteConfig config, - Map> sizesMap, Map keyToPartitionMap) { + Map> sizesMap, Map keyToPartitionMap) { List operations = new ArrayList<>(sizesMap.size()); sizesMap.forEach((k, v) -> {