From d0059712698d0c54aa951095b66c115a38d8b04a Mon Sep 17 00:00:00 2001 From: hehuiyuan1 Date: Sat, 23 Apr 2022 11:35:47 +0800 Subject: [PATCH] support generan parameter 'sink.parallelism' for flink-hudi --- .../main/java/org/apache/hudi/configuration/FlinkOptions.java | 3 +++ .../src/main/java/org/apache/hudi/table/HoodieTableSink.java | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index e2be7d364b77f..c6b16e6ecd29c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -34,6 +34,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.factories.FactoryUtil; import java.lang.reflect.Field; import java.util.ArrayList; @@ -232,6 +233,8 @@ private FlinkOptions() { // ------------------------------------------------------------------------ // Write Options // ------------------------------------------------------------------------ + public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; + public static final ConfigOption TABLE_NAME = ConfigOptions .key(HoodieWriteConfig.TBL_NAME.key()) .stringType() diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 4dd4f89d03c1d..46e360d3a22eb 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -82,7 +82,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { } // default parallelism - int parallelism = dataStream.getExecutionConfig().getParallelism(); + int parallelism = conf.getInteger(FlinkOptions.SINK_PARALLELISM, + dataStream.getExecutionConfig().getParallelism()); DataStream pipeline; // bootstrap final DataStream hoodieRecordDataStream =