diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java index 8347da6014af8..6ff063f49d445 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java @@ -111,11 +111,6 @@ protected Map getStrategyParams() { return params; } - @Override - protected List filterPartitionPaths(List partitionPaths) { - return partitionPaths; - } - @Override protected Stream getFileSlicesEligibleForClustering(final String partition) { return super.getFileSlicesEligibleForClustering(partition) @@ -123,7 +118,4 @@ protected Stream getFileSlicesEligibleForClustering(final String part .filter(slice -> slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < getWriteConfig().getClusteringSmallFileLimit()); } - private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) { - return (int) Math.ceil(groupSize / (double) targetFileSize); - } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 98ee48872be8e..9ba04438293b9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -668,16 +668,16 @@ private FlinkOptions() { + "SELECTED_PARTITIONS: keep partitions that are in the specified range ['" + PARTITION_FILTER_BEGIN_PARTITION.key() + "', '" + PARTITION_FILTER_END_PARTITION.key() + "']."); - public static final ConfigOption CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions + public static final ConfigOption CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions .key("clustering.plan.strategy.target.file.max.bytes") - .intType() - .defaultValue(1024 * 1024 * 1024) // default 1 GB + .longType() + .defaultValue(1024 * 1024 * 1024L) // default 1 GB .withDescription("Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB"); - public static final ConfigOption CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions + public static final ConfigOption CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions .key("clustering.plan.strategy.small.file.limit") - .intType() - .defaultValue(600) // default 600 MB + .longType() + .defaultValue(600L) // default 600 MB .withDescription("Files smaller than the size specified here are candidates for clustering, default 600 MB"); public static final ConfigOption CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigOptions diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java index 85e09cbc198a5..97b0bd3bc9956 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.table.HoodieFlinkTable; @@ -164,6 +165,12 @@ private void doCommit(String instant, HoodieClusteringPlan clusteringPlan, List< this.table.getMetaClient().reloadActiveTimeline(); this.writeClient.completeTableService( TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), table, instant); + + // whether to clean up the input base parquet files used for clustering + if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { + LOG.info("Running inline clean"); + this.writeClient.clean(); + } } private void reset(String instant) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java index 31db54b361b40..9b38f0ceeaf02 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; @@ -130,6 +131,10 @@ public ClusteringOperator(Configuration conf, RowType rowType) { this.rowType = rowType; this.asyncClustering = OptionsResolver.needsAsyncClustering(conf); this.sortClusteringEnabled = OptionsResolver.sortClusteringEnabled(conf); + + // override max parquet file size in conf + this.conf.setLong(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), + this.conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES)); } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java index 74fa73c3f935b..899edb5a683de 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java @@ -18,14 +18,24 @@ package org.apache.hudi.sink.clustering; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy; +import org.apache.hudi.common.config.DFSPropertiesConfiguration; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.util.StreamerUtil; import com.beust.jcommander.Parameter; import org.apache.flink.configuration.Configuration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /** * Configurations for Hoodie Flink clustering. */ @@ -69,13 +79,14 @@ public class FlinkClusteringConfig extends Configuration { required = false) public Integer archiveMaxCommits = 30; - @Parameter(names = {"--schedule", "-sc"}, description = "Not recommended. Schedule the clustering plan in this job.\n" - + "There is a risk of losing data when scheduling clustering outside the writer job.\n" - + "Scheduling clustering in the writer job and only let this job do the clustering execution is recommended.\n" - + "Default is true", required = false) - public Boolean schedule = true; + @Parameter(names = {"--schedule", "-sc"}, description = "Schedule the clustering plan in this job.\n" + + "Default is false", required = false) + public Boolean schedule = false; + + @Parameter(names = {"--instant-time", "-it"}, description = "Clustering Instant time") + public String clusteringInstantTime = null; - @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default", required = false) + @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, disabled by default", required = false) public Boolean cleanAsyncEnable = false; @Parameter(names = {"--plan-strategy-class"}, description = "Config to provide a strategy class to generator clustering plan", required = false) @@ -85,10 +96,10 @@ public class FlinkClusteringConfig extends Configuration { public String planPartitionFilterMode = "NONE"; @Parameter(names = {"--target-file-max-bytes"}, description = "Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB", required = false) - public Integer targetFileMaxBytes = 1024 * 1024 * 1024; + public Long targetFileMaxBytes = 1024 * 1024 * 1024L; @Parameter(names = {"--small-file-limit"}, description = "Files smaller than the size specified here are candidates for clustering, default 600 MB", required = false) - public Integer smallFileLimit = 600; + public Long smallFileLimit = 600L; @Parameter(names = {"--skip-from-latest-partitions"}, description = "Number of partitions to skip from latest when choosing partitions to create ClusteringPlan, default 0", required = false) public Integer skipFromLatestPartitions = 0; @@ -116,6 +127,33 @@ public class FlinkClusteringConfig extends Configuration { description = "Min clustering interval of async clustering service, default 10 minutes") public Integer minClusteringIntervalSeconds = 600; + @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " + + "(using the CLI parameter \"--props\") can also be passed through command line using this parameter.") + public List configs = new ArrayList<>(); + + @Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for " + + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are " + + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer" + + "to individual classes, for supported properties.") + public String propsFilePath = ""; + + public static TypedProperties buildProperties(List props) { + TypedProperties properties = DFSPropertiesConfiguration.getGlobalProps(); + props.forEach(x -> { + String[] kv = x.split("="); + ValidationUtils.checkArgument(kv.length == 2); + properties.setProperty(kv[0], kv[1]); + }); + return properties; + } + + public static TypedProperties getProps(FlinkClusteringConfig cfg) { + return cfg.propsFilePath.isEmpty() + ? buildProperties(cfg.configs) + : StreamerUtil.readConfig(HadoopConfigurations.getHadoopConf(cfg), + new Path(cfg.propsFilePath), cfg.configs).getProps(); + } + /** * Transforms a {@code FlinkClusteringConfig.config} into {@code Configuration}. * The latter is more suitable for the table APIs. It reads all the properties @@ -123,7 +161,8 @@ public class FlinkClusteringConfig extends Configuration { * (set by `--hoodie-conf` option). */ public static Configuration toFlinkConfig(FlinkClusteringConfig config) { - Configuration conf = new Configuration(); + Map propsMap = new HashMap((Map) getProps(config)); + org.apache.flink.configuration.Configuration conf = fromMap(propsMap); conf.setString(FlinkOptions.PATH, config.path); conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits); @@ -134,8 +173,8 @@ public static Configuration toFlinkConfig(FlinkClusteringConfig config) { conf.setInteger(FlinkOptions.CLUSTERING_TASKS, config.clusteringTasks); conf.setString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS, config.planStrategyClass); conf.setString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME, config.planPartitionFilterMode); - conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES, config.targetFileMaxBytes); - conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT, config.smallFileLimit); + conf.setLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES, config.targetFileMaxBytes); + conf.setLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT, config.smallFileLimit); conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, config.skipFromLatestPartitions); conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, config.sortColumns); conf.setInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS, config.maxNumGroups); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java index d29f1f9a49c6c..63b4588cf0bdc 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.clustering; +import org.apache.flink.client.deployment.application.ApplicationExecutionException; import org.apache.hudi.async.HoodieAsyncTableService; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.client.HoodieFlinkWriteClient; @@ -30,6 +31,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.compact.HoodieFlinkCompactor; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.ClusteringUtil; @@ -53,7 +55,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.stream.Collectors; /** * Flink hudi clustering program that can be executed manually. @@ -62,6 +63,8 @@ public class HoodieFlinkClusteringJob { protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkClusteringJob.class); + private static final String NO_EXECUTE_KEYWORD = "no execute"; + /** * Flink Execution Environment. */ @@ -99,6 +102,13 @@ public void start(boolean serviceMode) throws Exception { LOG.info("Hoodie Flink Clustering running only single round"); try { clusteringScheduleService.cluster(); + } catch (ApplicationExecutionException aee) { + if (aee.getMessage().contains(NO_EXECUTE_KEYWORD)) { + LOG.info("Clustering is not performed"); + } else { + LOG.error("Got error trying to perform clustering. Shutting down", aee); + throw aee; + } } catch (Exception e) { LOG.error("Got error running delta sync once. Shutting down", e); throw e; @@ -126,6 +136,7 @@ public static FlinkClusteringConfig getFlinkClusteringConfig(String[] args) { * Schedules clustering in service. */ public static class AsyncClusteringService extends HoodieAsyncTableService { + private static final long serialVersionUID = 1L; /** @@ -202,6 +213,12 @@ protected Pair startService() { try { cluster(); Thread.sleep(cfg.minClusteringIntervalSeconds * 1000); + } catch (ApplicationExecutionException aee) { + if (aee.getMessage().contains(NO_EXECUTE_KEYWORD)) { + LOG.info("Clustering is not performed."); + } else { + throw new HoodieException(aee.getMessage(), aee); + } } catch (Exception e) { LOG.error("Shutting down clustering service due to exception", e); error = true; @@ -215,14 +232,28 @@ protected Pair startService() { }, executor), executor); } + /** + * Follows the same execution methodology of HoodieFlinkCompactor, where only one clustering job is allowed to be + * executed at any point in time. + *

+ * If there is an inflight clustering job, it will be rolled back and re-attempted. + *

+ * A clustering plan will be generated if `schedule` is true. + * + * @throws Exception + * @see HoodieFlinkCompactor + */ private void cluster() throws Exception { table.getMetaClient().reloadActiveTimeline(); - // judges whether there are operations - // to compute the clustering instant time and exec clustering. if (cfg.schedule) { + // create a clustering plan on the timeline ClusteringUtil.validateClusteringScheduling(conf); - String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); + + String clusteringInstantTime = cfg.clusteringInstantTime != null ? cfg.clusteringInstantTime + : HoodieActiveTimeline.createNewInstantTime(); + + LOG.info("Creating a clustering plan for instant [" + clusteringInstantTime + "]"); boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty()); if (!scheduled) { // do nothing. @@ -233,17 +264,27 @@ private void cluster() throws Exception { } // fetch the instant based on the configured execution sequence - List instants = ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()).stream() - .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).collect(Collectors.toList()); + List instants = ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()); if (instants.isEmpty()) { // do nothing. LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option"); return; } - HoodieInstant clusteringInstant = CompactionUtil.isLIFO(cfg.clusteringSeq) ? instants.get(instants.size() - 1) : instants.get(0); + final HoodieInstant clusteringInstant; + if (cfg.clusteringInstantTime != null) { + clusteringInstant = instants.stream() + .filter(i -> i.getTimestamp().equals(cfg.clusteringInstantTime)) + .findFirst() + .orElseThrow(() -> new HoodieException("Clustering instant [" + cfg.clusteringInstantTime + "] not found")); + } else { + // check for inflight clustering plans and roll them back if required + clusteringInstant = + CompactionUtil.isLIFO(cfg.clusteringSeq) ? instants.get(instants.size() - 1) : instants.get(0); + } - HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp()); + HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant( + clusteringInstant.getTimestamp()); if (table.getMetaClient().getActiveTimeline().containsInstant(inflightInstant)) { LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]"); table.rollbackInflightClustering(inflightInstant, @@ -266,7 +307,7 @@ private void cluster() throws Exception { if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null) || (clusteringPlan.getInputGroups().isEmpty())) { - // No clustering plan, do nothing and return. + // no clustering plan, do nothing and return. LOG.info("No clustering plan for instant " + clusteringInstant.getTimestamp()); return; } @@ -279,7 +320,6 @@ private void cluster() throws Exception { // exceptionally. // clean the clustering plan in auxiliary path and cancels the clustering. - LOG.warn("The clustering plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" + "Clean the clustering plan in auxiliary path and cancels the clustering"); CompactionUtil.cleanInstant(table.getMetaClient(), instant); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index 92c73e2b9f5f1..12f16e1fd8f03 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.compact; +import org.apache.flink.client.deployment.application.ApplicationExecutionException; import org.apache.hudi.async.HoodieAsyncTableService; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.HoodieFlinkWriteClient; @@ -57,6 +58,8 @@ public class HoodieFlinkCompactor { protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class); + private static final String NO_EXECUTE_KEYWORD = "no execute"; + /** * Flink Execution Environment. */ @@ -94,6 +97,12 @@ public void start(boolean serviceMode) throws Exception { LOG.info("Hoodie Flink Compactor running only single round"); try { compactionScheduleService.compact(); + } catch (ApplicationExecutionException aee) { + if (aee.getMessage().contains(NO_EXECUTE_KEYWORD)) { + LOG.info("Compaction is not performed"); + } else { + throw aee; + } } catch (Exception e) { LOG.error("Got error running delta sync once. Shutting down", e); throw e; @@ -121,6 +130,7 @@ public static FlinkCompactionConfig getFlinkCompactionConfig(String[] args) { * Schedules compaction in service. */ public static class AsyncCompactionService extends HoodieAsyncTableService { + private static final long serialVersionUID = 1L; /** @@ -193,6 +203,12 @@ protected Pair startService() { try { compact(); Thread.sleep(cfg.minCompactionIntervalSeconds * 1000); + } catch (ApplicationExecutionException aee) { + if (aee.getMessage().contains(NO_EXECUTE_KEYWORD)) { + LOG.info("Compaction is not performed."); + } else { + throw new HoodieException(aee.getMessage(), aee); + } } catch (Exception e) { LOG.error("Shutting down compaction service due to exception", e); error = true; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index b09d7ad8bfc37..9df728148d07b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -176,8 +176,8 @@ public static HoodieWriteConfig getHoodieClientConfig( ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME))) .withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS)) .withClusteringMaxNumGroups(conf.getInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS)) - .withClusteringTargetFileMaxBytes(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES)) - .withClusteringPlanSmallFileLimit(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT) * 1024 * 1024L) + .withClusteringTargetFileMaxBytes(conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES)) + .withClusteringPlanSmallFileLimit(conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT) * 1024 * 1024L) .withClusteringSkipPartitionsFromLatest(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST)) .withAsyncClusteringMaxCommits(conf.getInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS)) .build())