diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index a8e38a4aa86e9..161fb21e7d8a2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -80,9 +80,11 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { // TODO: This is a very time-consuming operation, will optimization if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - hoodieDataStream = hoodieDataStream.rebalance().transform("index_bootstrap", - TypeInformation.of(HoodieRecord.class), - new ProcessOperator<>(new BootstrapFunction<>(conf))); + hoodieDataStream = hoodieDataStream.rebalance() + .transform("index_bootstrap", + TypeInformation.of(HoodieRecord.class), + new ProcessOperator<>(new BootstrapFunction<>(conf))) + .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); } DataStream pipeline = hoodieDataStream diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 786023efa335f..84905247c0981 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -180,8 +180,10 @@ public DataStream produceDataStream(StreamExecutionEnvironment execEnv) } OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, "streaming_source") + .uid("uid_streaming_source_" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(1) .transform("split_reader", typeInfo, factory) + .uid("uid_split_reader_" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource<>(source); } else {