diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java index 2389aa7fc1e02..d52c407028a21 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java @@ -183,6 +183,10 @@ public class HoodieHBaseIndexConfig extends HoodieConfig { .noDefaultValue() .withDocumentation("The value of hbase.master.kerberos.principal in hbase cluster."); + public static final ConfigProperty BUCKET_NUMBER = ConfigProperty + .key("hoodie.index.hbase.bucket.number") + .defaultValue(8) + .withDocumentation("Only applicable when using RebalancedSparkHoodieHBaseIndex, same as hbase regions count can get the best performance"); /** * @deprecated Use {@link #ZKQUORUM} and its methods instead diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 1603965ea987f..24e3d36af70ae 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1557,6 +1557,10 @@ public boolean getHbaseIndexUpdatePartitionPath() { return getBooleanOrDefault(HoodieHBaseIndexConfig.UPDATE_PARTITION_PATH_ENABLE); } + public int getHBaseIndexRegionCount() { + return getInt(HoodieHBaseIndexConfig.BUCKET_NUMBER); + } + public int getBloomIndexParallelism() { return getInt(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/RebalancedSparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/RebalancedSparkHoodieHBaseIndex.java new file mode 100644 index 0000000000000..0ee1bef98f8a0 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/RebalancedSparkHoodieHBaseIndex.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.hbase; + +import org.apache.hudi.config.HoodieWriteConfig; + +/** + * Extends {@link SparkHoodieHBaseIndex}, add random prefix to key for avoiding data skew issue in hbase regions. + */ +public class RebalancedSparkHoodieHBaseIndex extends SparkHoodieHBaseIndex { + + public RebalancedSparkHoodieHBaseIndex(HoodieWriteConfig config) { + super(config); + } + + @Override + protected String getHBaseKey(String originalKey) { + int bucket = Math.abs(originalKey.hashCode()) % config.getHBaseIndexRegionCount(); + String bucketStr = String.format("%0" + String.valueOf(config.getHBaseIndexRegionCount() - 1).length() + "d", bucket); + return bucketStr + originalKey; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java index f841117d5c3a1..f681789301fe6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java @@ -205,7 +205,7 @@ public void close() { } private Get generateStatement(String key) throws IOException { - return new Get(Bytes.toBytes(key)).setMaxVersions(1).addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN) + return new Get(Bytes.toBytes(getHBaseKey(key))).setMaxVersions(1).addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN) .addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN).addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN); } @@ -213,6 +213,10 @@ private Get generateStatement(String key, long startTime, long endTime) throws I return generateStatement(key).setTimeRange(startTime, endTime); } + protected String getHBaseKey(String key) { + return key; + } + private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String commitTs) { HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants(); // Check if the last commit ts for this row is 1) present in the timeline or @@ -354,14 +358,14 @@ private Function2, Iterator> updateL // This is an update, no need to update index continue; } - Put put = new Put(Bytes.toBytes(rec.getRecordKey())); + Put put = new Put(Bytes.toBytes(getHBaseKey(rec.getRecordKey()))); put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, Bytes.toBytes(loc.get().getInstantTime())); put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, Bytes.toBytes(loc.get().getFileId())); put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, Bytes.toBytes(rec.getPartitionPath())); mutations.add(put); } else { // Delete existing index for a deleted record - Delete delete = new Delete(Bytes.toBytes(rec.getRecordKey())); + Delete delete = new Delete(Bytes.toBytes(getHBaseKey(rec.getRecordKey()))); mutations.add(delete); } }