diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index ade550897765a..205da82ac145d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -112,10 +112,8 @@ public BaseSparkCommitActionExecutor(HoodieEngineContext context, } } - private HoodieData> clusteringHandleUpdate(HoodieData> inputRecords) { + private HoodieData> clusteringHandleUpdate(HoodieData> inputRecords, Set fileGroupsInPendingClustering) { context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering"); - Set fileGroupsInPendingClustering = - table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet()); UpdateStrategy>> updateStrategy = (UpdateStrategy>>) ReflectionUtils .loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering); Pair>, Set> recordsAndPendingClusteringFileGroups = @@ -166,7 +164,9 @@ public HoodieWriteMetadata> execute(HoodieData> inputRecordsWithClusteringUpdate = clusteringHandleUpdate(inputRecords); + Set fileGroupsInPendingClustering = + table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet()); + HoodieData> inputRecordsWithClusteringUpdate = fileGroupsInPendingClustering.isEmpty() ? inputRecords : clusteringHandleUpdate(inputRecords, fileGroupsInPendingClustering); context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data"); HoodieData writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);