Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,15 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.9.0")
.withDocumentation("Config to control frequency of async clustering");

public static final ConfigProperty<Integer> CLUSTERING_MAX_PARALLELISM = ConfigProperty
.key("hoodie.clustering.max.parallelism")
.defaultValue(15)
.sinceVersion("0.14.0")
.withDocumentation("Maximum number of parallelism jobs submitted in clustering operation. "
+ "If the resource is sufficient(Like Spark engine has enough idle executors), increasing this "
+ "value will let the clustering job run faster, while it will give additional pressure to the "
+ "execution engines to manage more concurrent running jobs.");

public static final ConfigProperty<String> PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "daybased.skipfromlatest.partitions")
.defaultValue("0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1634,6 +1634,10 @@ public String getClusteringPlanStrategyClass() {
return getString(HoodieClusteringConfig.PLAN_STRATEGY_CLASS_NAME);
}

public int getClusteringMaxParallelism() {
return getInt(HoodieClusteringConfig.CLUSTERING_MAX_PARALLELISM);
}

public ClusteringPlanPartitionFilterMode getClusteringPlanPartitionFilterMode() {
String mode = getString(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME);
return ClusteringPlanPartitionFilterMode.valueOf(mode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.FutureUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -82,6 +83,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -105,30 +108,39 @@ public MultipleSparkJobExecutionStrategy(HoodieTable table, HoodieEngineContext
public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) {
JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext());
boolean shouldPreserveMetadata = Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false);
// execute clustering for each group async and collect WriteStatus
Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
clusteringPlan.getInputGroups().stream()
.map(inputGroup -> {
if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) {
return runClusteringForGroupAsyncAsRow(inputGroup,
ExecutorService clusteringExecutorService = Executors.newFixedThreadPool(
Math.min(clusteringPlan.getInputGroups().size(), writeConfig.getClusteringMaxParallelism()),
new CustomizedThreadFactory("clustering-job-group", true));
try {
// execute clustering for each group async and collect WriteStatus
Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
clusteringPlan.getInputGroups().stream()
.map(inputGroup -> {
if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) {
return runClusteringForGroupAsyncAsRow(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
instantTime,
clusteringExecutorService);
}
return runClusteringForGroupAsync(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
instantTime);
}
return runClusteringForGroupAsync(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
instantTime);
})
.collect(Collectors.toList()))
.join()
.stream();
JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
JavaRDD<WriteStatus> writeStatusRDD = engineContext.union(writeStatuses);

HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
writeMetadata.setWriteStatuses(HoodieJavaRDD.of(writeStatusRDD));
return writeMetadata;
instantTime,
clusteringExecutorService);
})
.collect(Collectors.toList()))
.join()
.stream();
JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
JavaRDD<WriteStatus> writeStatusRDD = engineContext.union(writeStatuses);

HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
writeMetadata.setWriteStatuses(HoodieJavaRDD.of(writeStatusRDD));
return writeMetadata;
} finally {
clusteringExecutorService.shutdown();
}
}

/**
Expand Down Expand Up @@ -216,7 +228,8 @@ private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategy
* Submit job to execute clustering for the group using Avro/HoodieRecord representation.
*/
private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams,
boolean preserveHoodieMetadata, String instantTime) {
boolean preserveHoodieMetadata, String instantTime,
ExecutorService clusteringExecutorService) {
return CompletableFuture.supplyAsync(() -> {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());
HoodieData<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc, clusteringGroup, instantTime);
Expand All @@ -229,7 +242,7 @@ private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsync(Ho
.collect(Collectors.toList());
return performClusteringWithRecordsRDD(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, preserveHoodieMetadata,
clusteringGroup.getExtraMetadata());
});
}, clusteringExecutorService);
}

/**
Expand All @@ -238,7 +251,8 @@ private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsync(Ho
private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsyncAsRow(HoodieClusteringGroup clusteringGroup,
Map<String, String> strategyParams,
boolean shouldPreserveHoodieMetadata,
String instantTime) {
String instantTime,
ExecutorService clusteringExecutorService) {
return CompletableFuture.supplyAsync(() -> {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());
Dataset<Row> inputRecords = readRecordsForGroupAsRow(jsc, clusteringGroup, instantTime);
Expand All @@ -248,7 +262,7 @@ private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsyncAsR
.collect(Collectors.toList());
return performClusteringWithRecordsAsRow(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, shouldPreserveHoodieMetadata,
clusteringGroup.getExtraMetadata());
});
}, clusteringExecutorService);
}

/**
Expand Down