diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index 9732c52ac4203..5e6f7c135c6a3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -154,6 +154,15 @@ public class HoodieClusteringConfig extends HoodieConfig { .sinceVersion("0.9.0") .withDocumentation("Config to control frequency of async clustering"); + public static final ConfigProperty CLUSTERING_MAX_PARALLELISM = ConfigProperty + .key("hoodie.clustering.max.parallelism") + .defaultValue(15) + .sinceVersion("0.14.0") + .withDocumentation("Maximum number of parallelism jobs submitted in clustering operation. " + + "If the resource is sufficient(Like Spark engine has enough idle executors), increasing this " + + "value will let the clustering job run faster, while it will give additional pressure to the " + + "execution engines to manage more concurrent running jobs."); + public static final ConfigProperty PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigProperty .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "daybased.skipfromlatest.partitions") .defaultValue("0") diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index eba9728777f19..7b672abf241a3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1634,6 +1634,10 @@ public String getClusteringPlanStrategyClass() { return getString(HoodieClusteringConfig.PLAN_STRATEGY_CLASS_NAME); } + public int getClusteringMaxParallelism() { + return getInt(HoodieClusteringConfig.CLUSTERING_MAX_PARALLELISM); + } + public ClusteringPlanPartitionFilterMode getClusteringPlanPartitionFilterMode() { String mode = getString(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME); return ClusteringPlanPartitionFilterMode.valueOf(mode); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 540da42fd78cd..c6a1df9105ebd 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -36,6 +36,7 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.CustomizedThreadFactory; import org.apache.hudi.common.util.FutureUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; @@ -82,6 +83,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -105,30 +108,39 @@ public MultipleSparkJobExecutionStrategy(HoodieTable table, HoodieEngineContext public HoodieWriteMetadata> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) { JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext()); boolean shouldPreserveMetadata = Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false); - // execute clustering for each group async and collect WriteStatus - Stream> writeStatusesStream = FutureUtils.allOf( - clusteringPlan.getInputGroups().stream() - .map(inputGroup -> { - if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) { - return runClusteringForGroupAsyncAsRow(inputGroup, + ExecutorService clusteringExecutorService = Executors.newFixedThreadPool( + Math.min(clusteringPlan.getInputGroups().size(), writeConfig.getClusteringMaxParallelism()), + new CustomizedThreadFactory("clustering-job-group", true)); + try { + // execute clustering for each group async and collect WriteStatus + Stream> writeStatusesStream = FutureUtils.allOf( + clusteringPlan.getInputGroups().stream() + .map(inputGroup -> { + if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) { + return runClusteringForGroupAsyncAsRow(inputGroup, + clusteringPlan.getStrategy().getStrategyParams(), + shouldPreserveMetadata, + instantTime, + clusteringExecutorService); + } + return runClusteringForGroupAsync(inputGroup, clusteringPlan.getStrategy().getStrategyParams(), shouldPreserveMetadata, - instantTime); - } - return runClusteringForGroupAsync(inputGroup, - clusteringPlan.getStrategy().getStrategyParams(), - shouldPreserveMetadata, - instantTime); - }) - .collect(Collectors.toList())) - .join() - .stream(); - JavaRDD[] writeStatuses = convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD)); - JavaRDD writeStatusRDD = engineContext.union(writeStatuses); - - HoodieWriteMetadata> writeMetadata = new HoodieWriteMetadata<>(); - writeMetadata.setWriteStatuses(HoodieJavaRDD.of(writeStatusRDD)); - return writeMetadata; + instantTime, + clusteringExecutorService); + }) + .collect(Collectors.toList())) + .join() + .stream(); + JavaRDD[] writeStatuses = convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD)); + JavaRDD writeStatusRDD = engineContext.union(writeStatuses); + + HoodieWriteMetadata> writeMetadata = new HoodieWriteMetadata<>(); + writeMetadata.setWriteStatuses(HoodieJavaRDD.of(writeStatusRDD)); + return writeMetadata; + } finally { + clusteringExecutorService.shutdown(); + } } /** @@ -216,7 +228,8 @@ private BulkInsertPartitioner getPartitioner(Map strategy * Submit job to execute clustering for the group using Avro/HoodieRecord representation. */ private CompletableFuture> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map strategyParams, - boolean preserveHoodieMetadata, String instantTime) { + boolean preserveHoodieMetadata, String instantTime, + ExecutorService clusteringExecutorService) { return CompletableFuture.supplyAsync(() -> { JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext()); HoodieData> inputRecords = readRecordsForGroup(jsc, clusteringGroup, instantTime); @@ -229,7 +242,7 @@ private CompletableFuture> runClusteringForGroupAsync(Ho .collect(Collectors.toList()); return performClusteringWithRecordsRDD(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, preserveHoodieMetadata, clusteringGroup.getExtraMetadata()); - }); + }, clusteringExecutorService); } /** @@ -238,7 +251,8 @@ private CompletableFuture> runClusteringForGroupAsync(Ho private CompletableFuture> runClusteringForGroupAsyncAsRow(HoodieClusteringGroup clusteringGroup, Map strategyParams, boolean shouldPreserveHoodieMetadata, - String instantTime) { + String instantTime, + ExecutorService clusteringExecutorService) { return CompletableFuture.supplyAsync(() -> { JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext()); Dataset inputRecords = readRecordsForGroupAsRow(jsc, clusteringGroup, instantTime); @@ -248,7 +262,7 @@ private CompletableFuture> runClusteringForGroupAsyncAsR .collect(Collectors.toList()); return performClusteringWithRecordsAsRow(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, shouldPreserveHoodieMetadata, clusteringGroup.getExtraMetadata()); - }); + }, clusteringExecutorService); } /**