diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java index 15ead5efb0080..ab97204c079b1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java @@ -48,21 +48,23 @@ public class ClusteringPlanActionExecutor> extraMetadata; public ClusteringPlanActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable table, - String instantTime, - Option> extraMetadata) { + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + Option> extraMetadata) { super(context, config, table, instantTime); this.extraMetadata = extraMetadata; } protected Option createClusteringPlan() { LOG.info("Checking if clustering needs to be run on " + config.getBasePath()); - Option lastClusteringInstant = table.getActiveTimeline().getCompletedReplaceTimeline().lastInstant(); + Option lastClusteringInstant = table.getActiveTimeline() + .filter(s -> s.getAction().equalsIgnoreCase(HoodieTimeline.REPLACE_COMMIT_ACTION)).lastInstant(); int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants() .findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE) .countInstants(); + if (config.inlineClusteringEnabled() && config.getInlineClusterMaxCommits() > commitsSinceLastClustering) { LOG.info("Not scheduling inline clustering as only " + commitsSinceLastClustering + " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "