diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 28a669075da3c..3b2ee39528a8b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -173,9 +173,19 @@ public static DataStreamSink bulkInsert(Configuration conf, RowType rowT * @param conf The configuration * @param rowType The input row type * @param dataStream The input data stream + * @param bounded Whether the input stream is bounded * @return the appending data stream sink */ - public static DataStreamSink append(Configuration conf, RowType rowType, DataStream dataStream) { + public static DataStreamSink append( + Configuration conf, + RowType rowType, + DataStream dataStream, + boolean bounded) { + if (!bounded) { + // In principle, the config should be immutable, but the boundedness + // is only visible when creating the sink pipeline. + conf.setBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT, false); + } WriteOperatorFactory operatorFactory = AppendWriteOperator.getFactory(conf, rowType); return dataStream diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index ed99e7b4c1c3d..4dd4f89d03c1d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -78,7 +78,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { // Append mode if (OptionsResolver.isAppendMode(conf)) { - return Pipelines.append(conf, rowType, dataStream); + return Pipelines.append(conf, rowType, dataStream, context.isBounded()); } // default parallelism