Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,15 @@ public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowT
InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(rowTypeWithFileId);

Map<String, String> bucketIdToFileId = new HashMap<>();
dataStream = dataStream.partitionCustom(partitioner, keyGen::getRecordKey)
.map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo)
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle
if (OptionsResolver.isPartitionedTable(conf)) {
dataStream = dataStream.keyBy(keyGen::getPartitionPath)
.map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo)
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle
} else {
dataStream = dataStream.partitionCustom(partitioner, keyGen::getRecordKey)
.map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why shuffle the input with partition path first ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why shuffle the input with partition path first ?

  • Shuffle with custom partitioner was not friendly to partition table with less bucket (eg. partition by day and 1 bucket),It only use 1 write task
  • Make sure the same partition processed in same task avoid duplicated fileId

.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle
}
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
dataStream = dataStream.transform("file_sorter", typeInfo, sortOperatorGen.createSortOperator())
Expand Down