diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 92dda123fed47..4824c757cd9df 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.model.RewriteAvroPayload; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.util.FutureUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; @@ -88,16 +89,17 @@ public MultipleSparkJobExecutionStrategy(HoodieTable table, HoodieEngineContext @Override public HoodieWriteMetadata> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) { - // execute clustering for each group async and collect WriteStatus JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext()); // execute clustering for each group async and collect WriteStatus - Stream> writeStatusRDDStream = clusteringPlan.getInputGroups().stream() + Stream> writeStatusRDDStream = FutureUtils.allOf( + clusteringPlan.getInputGroups().stream() .map(inputGroup -> runClusteringForGroupAsync(inputGroup, clusteringPlan.getStrategy().getStrategyParams(), Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false), instantTime)) - .map(CompletableFuture::join); - + .collect(Collectors.toList())) + .join() + .stream(); JavaRDD[] writeStatuses = convertStreamToArray(writeStatusRDDStream); JavaRDD writeStatusRDD = engineContext.union(writeStatuses); @@ -145,7 +147,6 @@ protected Option> getPartitioner(Map st } } - /** * Submit job to execute clustering for the group. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FutureUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FutureUtils.java new file mode 100644 index 0000000000000..b0029917eebdf --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FutureUtils.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import javax.annotation.Nonnull; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * A utility class for future operation. + */ +public class FutureUtils { + + /** + * Parallel CompletableFutures + * + * @param futures CompletableFuture list + * @return a new CompletableFuture which will completed when all of the given CompletableFutures complete. + */ + public static CompletableFuture> allOf(@Nonnull List> futures) { + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .thenApply(aVoid -> + futures.stream() + // NOTE: This join wouldn't block, since all the + // futures are completed at this point. + .map(CompletableFuture::join) + .collect(Collectors.toList())); + } +}