Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ public static String checkAndGetClusteringPlanStrategy(HoodieWriteConfig config)
String sparkSizeBasedClassName = HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
String sparkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy";
String sparkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy";
String flinkSizeBasedClassName = HoodieClusteringConfig.FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
String flinkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkSelectedPartitionsClusteringPlanStrategy";
String flinkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy";
String javaSelectedPartitionClassName = "org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy";
String javaSizeBasedClassName = HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;

Expand All @@ -85,14 +82,6 @@ public static String checkAndGetClusteringPlanStrategy(HoodieWriteConfig config)
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
return sparkSizeBasedClassName;
} else if (flinkRecentDaysClassName.equals(className)) {
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.RECENT_DAYS.name()));
return flinkSizeBasedClassName;
} else if (flinkSelectedPartitionsClassName.equals(className)) {
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
return flinkSizeBasedClassName;
} else if (javaSelectedPartitionClassName.equals(className)) {
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
LOG.warn(String.format(logStr, className, javaSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
Expand Down Expand Up @@ -173,7 +162,7 @@ protected Map<String, Double> buildMetrics(List<FileSlice> fileSlices) {
return metrics;
}

protected HoodieTable<T,I,K, O> getHoodieTable() {
protected HoodieTable<T, I, K, O> getHoodieTable() {
return this.hoodieTable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public Option<HoodieCompactionPlan> execute() {
if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
&& !config.getFailedWritesCleanPolicy().isLazy()) {
// TODO(yihua): this validation is removed for Java client used by kafka-connect. Need to revisit this.
if (config.getEngineType() != EngineType.JAVA) {
if (config.getEngineType() == EngineType.SPARK) {
// if there are inflight writes, their instantTime must not be less than that of compaction instant time
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
.ifPresent(earliestInflight -> ValidationUtils.checkArgument(
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hudi.configuration;

import org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy;
import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.HoodieConfig;
Expand All @@ -45,6 +45,11 @@
import java.util.Map;
import java.util.Set;

import static org.apache.hudi.config.HoodieClusteringConfig.DAYBASED_LOOKBACK_PARTITIONS;
import static org.apache.hudi.config.HoodieClusteringConfig.PARTITION_FILTER_BEGIN_PARTITION;
import static org.apache.hudi.config.HoodieClusteringConfig.PARTITION_FILTER_END_PARTITION;
import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST;

/**
* Hoodie Flink config options.
*
Expand Down Expand Up @@ -594,6 +599,12 @@ private FlinkOptions() {
.defaultValue(false) // default false for pipeline
.withDescription("Schedule the cluster plan, default false");

public static final ConfigOption<Boolean> CLUSTERING_ASYNC_ENABLED = ConfigOptions
.key("clustering.async.enabled")
.booleanType()
.defaultValue(false) // default false for pipeline
.withDescription("Async Clustering, default false");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fals -> false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

public static final ConfigOption<Integer> CLUSTERING_DELTA_COMMITS = ConfigOptions
.key("clustering.delta_commits")
.intType()
Expand All @@ -615,11 +626,22 @@ private FlinkOptions() {
public static final ConfigOption<String> CLUSTERING_PLAN_STRATEGY_CLASS = ConfigOptions
.key("clustering.plan.strategy.class")
.stringType()
.defaultValue(FlinkRecentDaysClusteringPlanStrategy.class.getName())
.defaultValue(FlinkSizeBasedClusteringPlanStrategy.class.getName())
.withDescription("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan "
+ "i.e select what file groups are being clustered. Default strategy, looks at the last N (determined by "
+ CLUSTERING_TARGET_PARTITIONS.key() + ") day based partitions picks the small file slices within those partitions.");

public static final ConfigOption<String> CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME = ConfigOptions
.key("clustering.plan.partition.filter.mode")
.stringType()
.defaultValue("NONE")
.withDescription("Partition filter mode used in the creation of clustering plan. Available values are - "
+ "NONE: do not filter table partition and thus the clustering plan will include all partitions that have clustering candidate."
+ "RECENT_DAYS: keep a continuous range of partitions, worked together with configs '" + DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '"
+ PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST.key() + "."
+ "SELECTED_PARTITIONS: keep partitions that are in the specified range ['" + PARTITION_FILTER_BEGIN_PARTITION.key() + "', '"
+ PARTITION_FILTER_END_PARTITION.key() + "'].");

public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions
.key("clustering.plan.strategy.target.file.max.bytes")
.intType()
Expand All @@ -641,7 +663,7 @@ private FlinkOptions() {
public static final ConfigOption<String> CLUSTERING_SORT_COLUMNS = ConfigOptions
.key("clustering.plan.strategy.sort.columns")
.stringType()
.noDefaultValue()
.defaultValue("")
.withDescription("Columns to sort the data by when clustering");

public static final ConfigOption<Integer> CLUSTERING_MAX_NUM_GROUPS = ConfigOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.format.FilePathUtils;

Expand All @@ -42,7 +43,10 @@ public static boolean insertClustering(Configuration conf) {
* Returns whether the insert is clustering disabled with given configuration {@code conf}.
*/
public static boolean isAppendMode(Configuration conf) {
return isCowTable(conf) && isInsertOperation(conf) && !conf.getBoolean(FlinkOptions.INSERT_CLUSTER);
// 1. inline clustering is supported for COW table;
// 2. async clustering is supported for both COW and MOR table
return isCowTable(conf) && isInsertOperation(conf) && !conf.getBoolean(FlinkOptions.INSERT_CLUSTER)
|| needsScheduleClustering(conf);
}

/**
Expand Down Expand Up @@ -115,4 +119,49 @@ public static boolean emitChangelog(Configuration conf) {
return conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
&& conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
}

/**
* Returns whether there is need to schedule the async compaction.
*
* @param conf The flink configuration.
*/
public static boolean needsAsyncCompaction(Configuration conf) {
return OptionsResolver.isMorTable(conf)
&& conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED);
}

/**
* Returns whether there is need to schedule the compaction plan.
*
* @param conf The flink configuration.
*/
public static boolean needsScheduleCompaction(Configuration conf) {
return OptionsResolver.isMorTable(conf)
&& conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED);
}

/**
* Returns whether there is need to schedule the async clustering.
*
* @param conf The flink configuration.
*/
public static boolean needsAsyncClustering(Configuration conf) {
return isInsertOperation(conf) && conf.getBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED);
}

/**
* Returns whether there is need to schedule the clustering plan.
*
* @param conf The flink configuration.
*/
public static boolean needsScheduleClustering(Configuration conf) {
return isInsertOperation(conf) && conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED);
}

/**
* Returns whether the clustering sort is enabled.
*/
public static boolean sortClusteringEnabled(Configuration conf) {
return !StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.utils.HiveSyncContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;

Expand Down Expand Up @@ -253,6 +254,11 @@ public void notifyCheckpointComplete(long checkpointId) {
CompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, committed);
}

if (tableState.scheduleClustering) {
// if async clustering is on, schedule the clustering
ClusteringUtil.scheduleClustering(conf, writeClient, committed);
}

if (committed) {
// start new instant.
startInstant();
Expand Down Expand Up @@ -607,6 +613,7 @@ private static class TableState implements Serializable {
final String commitAction;
final boolean isOverwrite;
final boolean scheduleCompaction;
final boolean scheduleClustering;
final boolean syncHive;
final boolean syncMetadata;
final boolean isDeltaTimeCompaction;
Expand All @@ -616,7 +623,8 @@ private TableState(Configuration conf) {
this.commitAction = CommitUtils.getCommitActionType(this.operationType,
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT)));
this.isOverwrite = WriteOperationType.isOverwrite(this.operationType);
this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf);
this.scheduleCompaction = OptionsResolver.needsScheduleCompaction(conf);
this.scheduleClustering = OptionsResolver.needsScheduleClustering(conf);
this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED);
this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.List;

/**
* Represents a commit event from the clustering task {@link ClusteringFunction}.
* Represents a commit event from the clustering task {@link ClusteringOperator}.
*/
public class ClusteringCommitEvent implements Serializable {
private static final long serialVersionUID = 1L;
Expand All @@ -51,6 +51,10 @@ public ClusteringCommitEvent(String instant, List<WriteStatus> writeStatuses, in
this.taskID = taskID;
}

public ClusteringCommitEvent(String instant, int taskID) {
this(instant, null, taskID);
}

public void setInstant(String instant) {
this.instant = instant;
}
Expand All @@ -74,4 +78,8 @@ public List<WriteStatus> getWriteStatuses() {
public int getTaskID() {
return taskID;
}

public boolean isFailed() {
return this.writeStatuses == null;
}
}
Loading