diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 28a669075da3c..f08cb5315a30d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -102,9 +102,15 @@ public static DataStreamSink bulkInsert(Configuration conf, RowType rowT InternalTypeInfo typeInfo = InternalTypeInfo.of(rowTypeWithFileId); Map bucketIdToFileId = new HashMap<>(); - dataStream = dataStream.partitionCustom(partitioner, keyGen::getRecordKey) - .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo) - .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle + if (OptionsResolver.isPartitionedTable(conf)) { + dataStream = dataStream.keyBy(keyGen::getPartitionPath) + .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle + } else { + dataStream = dataStream.partitionCustom(partitioner, keyGen::getRecordKey) + .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle + } if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) { SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId); dataStream = dataStream.transform("file_sorter", typeInfo, sortOperatorGen.createSortOperator())