Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,11 @@ protected Map<String, String> getStrategyParams() {
return params;
}

@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
return partitionPaths;
}

@Override
protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String partition) {
return super.getFileSlicesEligibleForClustering(partition)
// Only files that have basefile size smaller than small file size are eligible.
.filter(slice -> slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < getWriteConfig().getClusteringSmallFileLimit());
}

private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) {
return (int) Math.ceil(groupSize / (double) targetFileSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -668,16 +668,16 @@ private FlinkOptions() {
+ "SELECTED_PARTITIONS: keep partitions that are in the specified range ['" + PARTITION_FILTER_BEGIN_PARTITION.key() + "', '"
+ PARTITION_FILTER_END_PARTITION.key() + "'].");

public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions
public static final ConfigOption<Long> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions
.key("clustering.plan.strategy.target.file.max.bytes")
.intType()
.defaultValue(1024 * 1024 * 1024) // default 1 GB
.longType()
.defaultValue(1024 * 1024 * 1024L) // default 1 GB
.withDescription("Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB");

public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions
public static final ConfigOption<Long> CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions
.key("clustering.plan.strategy.small.file.limit")
.intType()
.defaultValue(600) // default 600 MB
.longType()
.defaultValue(600L) // default 600 MB
.withDescription("Files smaller than the size specified here are candidates for clustering, default 600 MB");

public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.table.HoodieFlinkTable;
Expand Down Expand Up @@ -164,6 +165,12 @@ private void doCommit(String instant, HoodieClusteringPlan clusteringPlan, List<
this.table.getMetaClient().reloadActiveTimeline();
this.writeClient.completeTableService(
TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), table, instant);

// whether to clean up the input base parquet files used for clustering
if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
LOG.info("Running inline clean");
this.writeClient.clean();
}
}

private void reset(String instant) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
Expand Down Expand Up @@ -130,6 +131,10 @@ public ClusteringOperator(Configuration conf, RowType rowType) {
this.rowType = rowType;
this.asyncClustering = OptionsResolver.needsAsyncClustering(conf);
this.sortClusteringEnabled = OptionsResolver.sortClusteringEnabled(conf);

// override max parquet file size in conf
this.conf.setLong(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(),
this.conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,24 @@

package org.apache.hudi.sink.clustering;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.util.StreamerUtil;

import com.beust.jcommander.Parameter;
import org.apache.flink.configuration.Configuration;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Configurations for Hoodie Flink clustering.
*/
Expand Down Expand Up @@ -69,13 +79,14 @@ public class FlinkClusteringConfig extends Configuration {
required = false)
public Integer archiveMaxCommits = 30;

@Parameter(names = {"--schedule", "-sc"}, description = "Not recommended. Schedule the clustering plan in this job.\n"
+ "There is a risk of losing data when scheduling clustering outside the writer job.\n"
+ "Scheduling clustering in the writer job and only let this job do the clustering execution is recommended.\n"
+ "Default is true", required = false)
public Boolean schedule = true;
@Parameter(names = {"--schedule", "-sc"}, description = "Schedule the clustering plan in this job.\n"
+ "Default is false", required = false)
public Boolean schedule = false;

@Parameter(names = {"--instant-time", "-it"}, description = "Clustering Instant time")
public String clusteringInstantTime = null;

@Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default", required = false)
@Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, disabled by default", required = false)
public Boolean cleanAsyncEnable = false;

@Parameter(names = {"--plan-strategy-class"}, description = "Config to provide a strategy class to generator clustering plan", required = false)
Expand All @@ -85,10 +96,10 @@ public class FlinkClusteringConfig extends Configuration {
public String planPartitionFilterMode = "NONE";

@Parameter(names = {"--target-file-max-bytes"}, description = "Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB", required = false)
public Integer targetFileMaxBytes = 1024 * 1024 * 1024;
public Long targetFileMaxBytes = 1024 * 1024 * 1024L;

@Parameter(names = {"--small-file-limit"}, description = "Files smaller than the size specified here are candidates for clustering, default 600 MB", required = false)
public Integer smallFileLimit = 600;
public Long smallFileLimit = 600L;

@Parameter(names = {"--skip-from-latest-partitions"}, description = "Number of partitions to skip from latest when choosing partitions to create ClusteringPlan, default 0", required = false)
public Integer skipFromLatestPartitions = 0;
Expand Down Expand Up @@ -116,14 +127,42 @@ public class FlinkClusteringConfig extends Configuration {
description = "Min clustering interval of async clustering service, default 10 minutes")
public Integer minClusteringIntervalSeconds = 600;

@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--props\") can also be passed through command line using this parameter.")
public List<String> configs = new ArrayList<>();

@Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for "
+ "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
+ "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
+ "to individual classes, for supported properties.")
public String propsFilePath = "";

public static TypedProperties buildProperties(List<String> props) {
TypedProperties properties = DFSPropertiesConfiguration.getGlobalProps();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also move this clazz into StreamerUtil, and please fire another JIRA to refactor the method readConfig and buildProperties to move them into clazz DFSPropertiesConfiguration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. I rewrote this method as the one in StreamerUtil is buggy and will ignore parameters passed in by --hoodie-conf if --props is empty.

props.forEach(x -> {
String[] kv = x.split("=");
ValidationUtils.checkArgument(kv.length == 2);
properties.setProperty(kv[0], kv[1]);
});
return properties;
}

public static TypedProperties getProps(FlinkClusteringConfig cfg) {
return cfg.propsFilePath.isEmpty()
? buildProperties(cfg.configs)
: StreamerUtil.readConfig(HadoopConfigurations.getHadoopConf(cfg),
new Path(cfg.propsFilePath), cfg.configs).getProps();
}

/**
* Transforms a {@code FlinkClusteringConfig.config} into {@code Configuration}.
* The latter is more suitable for the table APIs. It reads all the properties
* in the properties file (set by `--props` option) and cmd line options
* (set by `--hoodie-conf` option).
*/
public static Configuration toFlinkConfig(FlinkClusteringConfig config) {
Configuration conf = new Configuration();
Map<String, String> propsMap = new HashMap<String, String>((Map) getProps(config));
org.apache.flink.configuration.Configuration conf = fromMap(propsMap);

conf.setString(FlinkOptions.PATH, config.path);
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits);
Expand All @@ -134,8 +173,8 @@ public static Configuration toFlinkConfig(FlinkClusteringConfig config) {
conf.setInteger(FlinkOptions.CLUSTERING_TASKS, config.clusteringTasks);
conf.setString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS, config.planStrategyClass);
conf.setString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME, config.planPartitionFilterMode);
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES, config.targetFileMaxBytes);
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT, config.smallFileLimit);
conf.setLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES, config.targetFileMaxBytes);
conf.setLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT, config.smallFileLimit);
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, config.skipFromLatestPartitions);
conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, config.sortColumns);
conf.setInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS, config.maxNumGroups);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.sink.clustering;

import org.apache.flink.client.deployment.application.ApplicationExecutionException;
import org.apache.hudi.async.HoodieAsyncTableService;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
Expand All @@ -30,6 +31,7 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.compact.HoodieFlinkCompactor;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.ClusteringUtil;
Expand All @@ -53,7 +55,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

/**
* Flink hudi clustering program that can be executed manually.
Expand All @@ -62,6 +63,8 @@ public class HoodieFlinkClusteringJob {

protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkClusteringJob.class);

private static final String NO_EXECUTE_KEYWORD = "no execute";

/**
* Flink Execution Environment.
*/
Expand Down Expand Up @@ -99,6 +102,13 @@ public void start(boolean serviceMode) throws Exception {
LOG.info("Hoodie Flink Clustering running only single round");
try {
clusteringScheduleService.cluster();
} catch (ApplicationExecutionException aee) {
if (aee.getMessage().contains(NO_EXECUTE_KEYWORD)) {
LOG.info("Clustering is not performed");
} else {
LOG.error("Got error trying to perform clustering. Shutting down", aee);
throw aee;
}
} catch (Exception e) {
LOG.error("Got error running delta sync once. Shutting down", e);
throw e;
Expand Down Expand Up @@ -126,6 +136,7 @@ public static FlinkClusteringConfig getFlinkClusteringConfig(String[] args) {
* Schedules clustering in service.
*/
public static class AsyncClusteringService extends HoodieAsyncTableService {

private static final long serialVersionUID = 1L;

/**
Expand Down Expand Up @@ -202,6 +213,12 @@ protected Pair<CompletableFuture, ExecutorService> startService() {
try {
cluster();
Thread.sleep(cfg.minClusteringIntervalSeconds * 1000);
} catch (ApplicationExecutionException aee) {
if (aee.getMessage().contains(NO_EXECUTE_KEYWORD)) {
LOG.info("Clustering is not performed.");
} else {
throw new HoodieException(aee.getMessage(), aee);
}
} catch (Exception e) {
LOG.error("Shutting down clustering service due to exception", e);
error = true;
Expand All @@ -215,14 +232,28 @@ protected Pair<CompletableFuture, ExecutorService> startService() {
}, executor), executor);
}

/**
* Follows the same execution methodology of HoodieFlinkCompactor, where only one clustering job is allowed to be
* executed at any point in time.
* <p>
* If there is an inflight clustering job, it will be rolled back and re-attempted.
* <p>
* A clustering plan will be generated if `schedule` is true.
*
* @throws Exception
* @see HoodieFlinkCompactor
*/
private void cluster() throws Exception {
table.getMetaClient().reloadActiveTimeline();

// judges whether there are operations
// to compute the clustering instant time and exec clustering.
if (cfg.schedule) {
// create a clustering plan on the timeline
ClusteringUtil.validateClusteringScheduling(conf);
String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();

String clusteringInstantTime = cfg.clusteringInstantTime != null ? cfg.clusteringInstantTime
: HoodieActiveTimeline.createNewInstantTime();

LOG.info("Creating a clustering plan for instant [" + clusteringInstantTime + "]");
boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
if (!scheduled) {
// do nothing.
Expand All @@ -233,17 +264,27 @@ private void cluster() throws Exception {
}

// fetch the instant based on the configured execution sequence
List<HoodieInstant> instants = ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()).stream()
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).collect(Collectors.toList());
List<HoodieInstant> instants = ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient());
if (instants.isEmpty()) {
// do nothing.
LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option");
return;
}

HoodieInstant clusteringInstant = CompactionUtil.isLIFO(cfg.clusteringSeq) ? instants.get(instants.size() - 1) : instants.get(0);
final HoodieInstant clusteringInstant;
if (cfg.clusteringInstantTime != null) {
clusteringInstant = instants.stream()
.filter(i -> i.getTimestamp().equals(cfg.clusteringInstantTime))
.findFirst()
.orElseThrow(() -> new HoodieException("Clustering instant [" + cfg.clusteringInstantTime + "] not found"));
} else {
// check for inflight clustering plans and roll them back if required
clusteringInstant =
CompactionUtil.isLIFO(cfg.clusteringSeq) ? instants.get(instants.size() - 1) : instants.get(0);
}

HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp());
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(
clusteringInstant.getTimestamp());
if (table.getMetaClient().getActiveTimeline().containsInstant(inflightInstant)) {
LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]");
table.rollbackInflightClustering(inflightInstant,
Expand All @@ -266,7 +307,7 @@ private void cluster() throws Exception {

if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null)
|| (clusteringPlan.getInputGroups().isEmpty())) {
// No clustering plan, do nothing and return.
// no clustering plan, do nothing and return.
LOG.info("No clustering plan for instant " + clusteringInstant.getTimestamp());
return;
}
Expand All @@ -279,7 +320,6 @@ private void cluster() throws Exception {
// exceptionally.

// clean the clustering plan in auxiliary path and cancels the clustering.

LOG.warn("The clustering plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
+ "Clean the clustering plan in auxiliary path and cancels the clustering");
CompactionUtil.cleanInstant(table.getMetaClient(), instant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.sink.compact;

import org.apache.flink.client.deployment.application.ApplicationExecutionException;
import org.apache.hudi.async.HoodieAsyncTableService;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
Expand Down Expand Up @@ -57,6 +58,8 @@ public class HoodieFlinkCompactor {

protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class);

private static final String NO_EXECUTE_KEYWORD = "no execute";

/**
* Flink Execution Environment.
*/
Expand Down Expand Up @@ -94,6 +97,12 @@ public void start(boolean serviceMode) throws Exception {
LOG.info("Hoodie Flink Compactor running only single round");
try {
compactionScheduleService.compact();
} catch (ApplicationExecutionException aee) {
if (aee.getMessage().contains(NO_EXECUTE_KEYWORD)) {
LOG.info("Compaction is not performed");
} else {
throw aee;
}
} catch (Exception e) {
LOG.error("Got error running delta sync once. Shutting down", e);
throw e;
Expand Down Expand Up @@ -121,6 +130,7 @@ public static FlinkCompactionConfig getFlinkCompactionConfig(String[] args) {
* Schedules compaction in service.
*/
public static class AsyncCompactionService extends HoodieAsyncTableService {

private static final long serialVersionUID = 1L;

/**
Expand Down Expand Up @@ -193,6 +203,12 @@ protected Pair<CompletableFuture, ExecutorService> startService() {
try {
compact();
Thread.sleep(cfg.minCompactionIntervalSeconds * 1000);
} catch (ApplicationExecutionException aee) {
if (aee.getMessage().contains(NO_EXECUTE_KEYWORD)) {
LOG.info("Compaction is not performed.");
} else {
throw new HoodieException(aee.getMessage(), aee);
}
} catch (Exception e) {
LOG.error("Shutting down compaction service due to exception", e);
error = true;
Expand Down
Loading