diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java index 1f233b429789d..48ccce1d1740c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java @@ -73,7 +73,7 @@ private static List getHashKeysUsingIndexFields(String recordKey, List p.split(":")) .collect(Collectors.toMap(p -> p[0], p -> p[1])); return indexKeyFields.stream() - .map(f -> recordKeyPairs.get(f)).collect(Collectors.toList()); + .map(recordKeyPairs::get).collect(Collectors.toList()); } public static String partitionBucketIdStr(String partition, int bucketId) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java index b6fecff2042cc..3f84b2799ae56 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.bulk; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.common.util.StringUtils; @@ -127,6 +128,10 @@ public static RowDataKeyGen instance(Configuration conf, RowType rowType) { keyGeneratorOpt); } + public HoodieKey getHoodieKey(RowData rowData) { + return new HoodieKey(getRecordKey(rowData), getPartitionPath(rowData)); + } + public String getRecordKey(RowData rowData) { if (this.simpleRecordKey) { return getRecordKey(recordKeyFieldGetter.getFieldOrNull(rowData), this.recordKeyFields[0]); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java index b9b737ce22857..5fa3d1ab9a0a2 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.partitioner; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.index.bucket.BucketIdentifier; import org.apache.flink.api.common.functions.Partitioner; @@ -28,7 +29,7 @@ * * @param The type of obj to hash */ -public class BucketIndexPartitioner implements Partitioner { +public class BucketIndexPartitioner implements Partitioner { private final int bucketNum; private final String indexKeyFields; @@ -39,8 +40,9 @@ public BucketIndexPartitioner(int bucketNum, String indexKeyFields) { } @Override - public int partition(String key, int numPartitions) { + public int partition(HoodieKey key, int numPartitions) { int curBucket = BucketIdentifier.getBucketId(key, indexKeyFields, bucketNum); - return BucketIdentifier.mod(curBucket, numPartitions); + int globalHash = (key.getPartitionPath() + curBucket).hashCode() & Integer.MAX_VALUE; + return BucketIdentifier.mod(globalHash, numPartitions); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 3b2ee39528a8b..91ac2beadc080 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.utils; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; @@ -96,13 +97,13 @@ public static DataStreamSink bulkInsert(Configuration conf, RowType rowT String indexKeys = conf.getString(FlinkOptions.INDEX_KEY_FIELD); int numBuckets = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); - BucketIndexPartitioner partitioner = new BucketIndexPartitioner<>(numBuckets, indexKeys); + BucketIndexPartitioner partitioner = new BucketIndexPartitioner<>(numBuckets, indexKeys); RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType); RowType rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType); InternalTypeInfo typeInfo = InternalTypeInfo.of(rowTypeWithFileId); Map bucketIdToFileId = new HashMap<>(); - dataStream = dataStream.partitionCustom(partitioner, keyGen::getRecordKey) + dataStream = dataStream.partitionCustom(partitioner, keyGen::getHoodieKey) .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) { @@ -319,8 +320,8 @@ public static DataStream hoodieStreamWrite(Configuration conf, int defau WriteOperatorFactory operatorFactory = BucketStreamWriteOperator.getFactory(conf); int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD); - BucketIndexPartitioner partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields); - return dataStream.partitionCustom(partitioner, HoodieRecord::getRecordKey) + BucketIndexPartitioner partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields); + return dataStream.partitionCustom(partitioner, HoodieRecord::getKey) .transform("bucket_write", TypeInformation.of(Object.class), operatorFactory) .uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));