diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index bee5fe641aecb..cc5f6c030b345 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -99,7 +99,7 @@ public static void main(String[] args) throws Exception { .uid("uid_kafka_source") .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - hoodieDataStream = hoodieDataStream.transform("index_bootstrap", + hoodieDataStream = hoodieDataStream.rebalance().transform("index_bootstrap", TypeInformation.of(HoodieRecord.class), new ProcessOperator<>(new BootstrapFunction<>(conf))); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index d648bc0811222..a8e38a4aa86e9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -80,7 +80,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { // TODO: This is a very time-consuming operation, will optimization if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - hoodieDataStream = hoodieDataStream.transform("index_bootstrap", + hoodieDataStream = hoodieDataStream.rebalance().transform("index_bootstrap", TypeInformation.of(HoodieRecord.class), new ProcessOperator<>(new BootstrapFunction<>(conf))); }