Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;

import javax.annotation.Nonnull;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
Expand Down Expand Up @@ -94,6 +95,12 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.11.0")
.withDocumentation("Filter clustering partitions that matched regex pattern");

public static final ConfigProperty<String> PARTITION_SELECTED = ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "partition.selected")
.noDefaultValue()
.sinceVersion("0.11.0")
.withDocumentation("Partitions to run clustering");

public static final ConfigProperty<String> PLAN_STRATEGY_CLASS_NAME = ConfigProperty
.key("hoodie.clustering.plan.strategy.class")
.defaultValue(SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY)
Expand Down Expand Up @@ -473,6 +480,11 @@ public Builder withClusteringPartitionRegexPattern(String pattern) {
return this;
}

public Builder withClusteringPartitionSelected(String partitionSelected) {
clusteringConfig.setValue(PARTITION_SELECTED, partitionSelected);
return this;
}

public Builder withClusteringSkipPartitionsFromLatest(int clusteringSkipPartitionsFromLatest) {
clusteringConfig.setValue(PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, String.valueOf(clusteringSkipPartitionsFromLatest));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,10 @@ public long getClusteringSmallFileLimit() {
return getLong(HoodieClusteringConfig.PLAN_STRATEGY_SMALL_FILE_LIMIT);
}

public String getClusteringPartitionSelected() {
return getString(HoodieClusteringConfig.PARTITION_SELECTED);
}

public String getClusteringPartitionFilterRegexPattern() {
return getString(HoodieClusteringConfig.PARTITION_REGEX_PATTERN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Partition filter utilities. Currently, we support three mode:
Expand Down Expand Up @@ -58,11 +59,18 @@ private static List<String> recentDaysFilter(List<String> partitions, HoodieWrit
}

private static List<String> selectedPartitionsFilter(List<String> partitions, HoodieWriteConfig config) {
Stream<String> filteredPartitions = partitions.stream();

String beginPartition = config.getBeginPartitionForClustering();
if (beginPartition != null) {
filteredPartitions = filteredPartitions.filter(path -> path.compareTo(beginPartition) >= 0);
}

String endPartition = config.getEndPartitionForClustering();
List<String> filteredPartitions = partitions.stream()
.filter(path -> path.compareTo(beginPartition) >= 0 && path.compareTo(endPartition) <= 0)
.collect(Collectors.toList());
return filteredPartitions;
if (endPartition != null) {
filteredPartitions = filteredPartitions.filter(path -> path.compareTo(endPartition) <= 0);
}

return filteredPartitions.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -72,8 +73,8 @@ public Option<HoodieClusteringPlan> generateClusteringPlan() {
HoodieWriteConfig config = getWriteConfig();
List<String> partitionPaths = FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(), metaClient.getBasePath());

// get regex matched partitions if set
partitionPaths = getRegexPatternMatchedPartitions(config, partitionPaths);
// get matched partitions if set
partitionPaths = getMatchedPartitions(config, partitionPaths);
// filter the partition paths if needed to reduce list status
partitionPaths = filterPartitionPaths(partitionPaths);

Expand Down Expand Up @@ -113,6 +114,15 @@ public Option<HoodieClusteringPlan> generateClusteringPlan() {
.build());
}

public List<String> getMatchedPartitions(HoodieWriteConfig config, List<String> partitionPaths) {
String partitionSelected = config.getClusteringPartitionSelected();
if (!StringUtils.isNullOrEmpty(partitionSelected)) {
return Arrays.asList(partitionSelected.split(","));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delimiterwhich is comma by default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the filtered partition is separated by comma when pruning partition, see the end of org.apache.spark.sql.hudi.command.procedures.RunClusteringProcedure#prunePartition for more detail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the filtered partition is separated by comma when pruning partition, see the end of org.apache.spark.sql.hudi.command.procedures.RunClusteringProcedure#prunePartition for more detail.

Change to a constant or set writer's option?

} else {
return getRegexPatternMatchedPartitions(config, partitionPaths);
}
}

public List<String> getRegexPatternMatchedPartitions(HoodieWriteConfig config, List<String> partitionPaths) {
String pattern = config.getClusteringPartitionFilterRegexPattern();
if (!StringUtils.isNullOrEmpty(pattern)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void testFilterPartitionPaths() {
fakeTimeBasedPartitionsPath.add("20210719");
fakeTimeBasedPartitionsPath.add("20210721");

List list = strategyTestRegexPattern.getRegexPatternMatchedPartitions(hoodieWriteConfig, fakeTimeBasedPartitionsPath);
List list = strategyTestRegexPattern.getMatchedPartitions(hoodieWriteConfig, fakeTimeBasedPartitionsPath);
assertEquals(2, list.size());
assertTrue(list.contains("20210721"));
assertTrue(list.contains("20210723"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand All @@ -37,6 +35,9 @@
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -333,7 +334,7 @@ private static long fileSliceSize(FileSlice fileSlice) {
return fileSlice.getBaseFile().map(BaseFile::getFileLen).orElse(0L) + logFileSize;
}

protected static final class PartitionPath {
public static final class PartitionPath {
final String path;
final Object[] values;

Expand All @@ -342,6 +343,10 @@ public PartitionPath(String path, Object[] values) {
this.values = values;
}

public String getPath() {
return path;
}

Path fullPartitionPath(String basePath) {
if (!path.isEmpty()) {
return new Path(basePath, path);
Expand Down
Loading