diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java index 699f078a700d5..26ad824ce8d8c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java @@ -86,8 +86,8 @@ public class FlinkCompactionConfig extends Configuration { @Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write) for batching compaction, default 512000M.", required = false) public Long compactionTargetIo = 512000L; - @Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is 10", required = false) - public Integer compactionTasks = 10; + @Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is -1", required = false) + public Integer compactionTasks = -1; /** * Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}. diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index 3e0a4375b7efd..8ee6c111eb52d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -112,7 +112,8 @@ public static void main(String[] args) throws Exception { } // get compactionParallelism. - int compactionParallelism = Math.min(conf.getInteger(FlinkOptions.COMPACTION_TASKS), compactionPlan.getOperations().size()); + int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1 + ? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS); env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime)) .name("compaction_source")