From e8f46d2b1e05de82a5476d7de639f39aa015b840 Mon Sep 17 00:00:00 2001 From: lewinma Date: Wed, 3 Aug 2022 08:16:14 +0800 Subject: [PATCH] [HUDI-4477] Adjust partition number of flink sink task --- .../src/main/java/org/apache/hudi/sink/utils/Pipelines.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 31355255f905b..f89bdb2606b01 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -56,7 +56,6 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.table.data.RowData; @@ -136,8 +135,8 @@ public static DataStreamSink bulkInsert(Configuration conf, RowType rowT // shuffle by partition keys // use #partitionCustom instead of #keyBy to avoid duplicate sort operations, // see BatchExecutionUtils#applyBatchExecutionSettings for details. - Partitioner partitioner = (key, channels) -> - KeyGroupRangeAssignment.assignKeyToParallelOperator(key, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM, channels); + Partitioner partitioner = (key, channels) -> KeyGroupRangeAssignment.assignKeyToParallelOperator(key, + KeyGroupRangeAssignment.computeDefaultMaxParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)), channels); dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getPartitionPath); } if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {