Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private static List<String> getHashKeysUsingIndexFields(String recordKey, List<S
.map(p -> p.split(":"))
.collect(Collectors.toMap(p -> p[0], p -> p[1]));
return indexKeyFields.stream()
.map(f -> recordKeyPairs.get(f)).collect(Collectors.toList());
.map(recordKeyPairs::get).collect(Collectors.toList());
}

public static String partitionBucketIdStr(String partition, int bucketId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.sink.bulk;

import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -127,6 +128,10 @@ public static RowDataKeyGen instance(Configuration conf, RowType rowType) {
keyGeneratorOpt);
}

public HoodieKey getHoodieKey(RowData rowData) {
return new HoodieKey(getRecordKey(rowData), getPartitionPath(rowData));
}

public String getRecordKey(RowData rowData) {
if (this.simpleRecordKey) {
return getRecordKey(recordKeyFieldGetter.getFieldOrNull(rowData), this.recordKeyFields[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.sink.partitioner;

import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.index.bucket.BucketIdentifier;

import org.apache.flink.api.common.functions.Partitioner;
Expand All @@ -28,7 +29,7 @@
*
* @param <T> The type of obj to hash
*/
public class BucketIndexPartitioner<T extends String> implements Partitioner<T> {
public class BucketIndexPartitioner<T extends HoodieKey> implements Partitioner<T> {

private final int bucketNum;
private final String indexKeyFields;
Expand All @@ -39,8 +40,9 @@ public BucketIndexPartitioner(int bucketNum, String indexKeyFields) {
}

@Override
public int partition(String key, int numPartitions) {
public int partition(HoodieKey key, int numPartitions) {
int curBucket = BucketIdentifier.getBucketId(key, indexKeyFields, bucketNum);
return BucketIdentifier.mod(curBucket, numPartitions);
int globalHash = (key.getPartitionPath() + curBucket).hashCode() & Integer.MAX_VALUE;
return BucketIdentifier.mod(globalHash, numPartitions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.sink.utils;

import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
Expand Down Expand Up @@ -96,13 +97,13 @@ public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowT
String indexKeys = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
int numBuckets = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);

BucketIndexPartitioner<String> partitioner = new BucketIndexPartitioner<>(numBuckets, indexKeys);
BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(numBuckets, indexKeys);
RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType);
RowType rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(rowTypeWithFileId);

Map<String, String> bucketIdToFileId = new HashMap<>();
dataStream = dataStream.partitionCustom(partitioner, keyGen::getRecordKey)
dataStream = dataStream.partitionCustom(partitioner, keyGen::getHoodieKey)
.map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo)
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
Expand Down Expand Up @@ -319,8 +320,8 @@ public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defau
WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
BucketIndexPartitioner<String> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
return dataStream.partitionCustom(partitioner, HoodieRecord::getRecordKey)
BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
.transform("bucket_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
Expand Down