-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-1347]fix Hbase index partition changes cause data duplication p… #2188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,8 @@ | |
|
|
||
| package org.apache.hudi.index.hbase; | ||
|
|
||
| import org.apache.hadoop.hbase.client.ResultScanner; | ||
| import org.apache.hadoop.hbase.client.Scan; | ||
| import org.apache.hudi.client.WriteStatus; | ||
| import org.apache.hudi.client.common.HoodieSparkEngineContext; | ||
| import org.apache.hudi.client.utils.SparkMemoryUtils; | ||
|
|
@@ -28,6 +30,7 @@ | |
| import org.apache.hudi.common.model.HoodieRecordLocation; | ||
| import org.apache.hudi.common.model.HoodieRecordPayload; | ||
| import org.apache.hudi.common.table.HoodieTableMetaClient; | ||
| import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; | ||
| import org.apache.hudi.common.table.timeline.HoodieTimeline; | ||
| import org.apache.hudi.common.util.Option; | ||
| import org.apache.hudi.common.util.RateLimiter; | ||
|
|
@@ -67,6 +70,7 @@ | |
| import java.io.IOException; | ||
| import java.io.Serializable; | ||
| import java.util.ArrayList; | ||
| import java.util.Date; | ||
| import java.util.HashMap; | ||
| import java.util.Iterator; | ||
| import java.util.LinkedList; | ||
|
|
@@ -181,6 +185,10 @@ private Get generateStatement(String key) throws IOException { | |
| .addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN).addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN); | ||
| } | ||
|
|
||
| private Get generateStatement(String key, long startTime, long endTime) throws IOException { | ||
| return generateStatement(key).setTimeRange(startTime, endTime); | ||
| } | ||
|
|
||
| private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String commitTs) { | ||
| HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants(); | ||
| // Check if the last commit ts for this row is 1) present in the timeline or | ||
|
|
@@ -537,7 +545,72 @@ private Integer getNumRegionServersAliveForTable() { | |
|
|
||
| @Override | ||
| public boolean rollbackCommit(String instantTime) { | ||
| // Rollback in HbaseIndex is managed via method {@link #checkIfValidCommit()} | ||
| int multiGetBatchSize = config.getHbaseIndexGetBatchSize(); | ||
| boolean rollbackSync = config.getHBaseIndexRollbackSync(); | ||
|
|
||
| if (!config.getHBaseIndexRollbackSync()) { | ||
| // Default Rollback in HbaseIndex is managed via method {@link #checkIfValidCommit()} | ||
| return true; | ||
| } | ||
|
|
||
| synchronized (SparkHoodieHBaseIndex.class) { | ||
|
||
| if (hbaseConnection == null || hbaseConnection.isClosed()) { | ||
| hbaseConnection = getHBaseConnection(); | ||
| } | ||
| } | ||
| try (HTable hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName)); | ||
| BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) { | ||
| final RateLimiter limiter = RateLimiter.create(multiPutBatchSize, TimeUnit.SECONDS); | ||
|
|
||
| Long rollbackTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse(instantTime).getTime(); | ||
| Long currentTime = new Date().getTime(); | ||
| Scan scan = new Scan(); | ||
| scan.addFamily(SYSTEM_COLUMN_FAMILY); | ||
| scan.setTimeRange(rollbackTime, currentTime); | ||
| ResultScanner scanner = hTable.getScanner(scan); | ||
| Iterator<Result> scannerIterator = scanner.iterator(); | ||
|
|
||
| List<Get> statements = new ArrayList<>(); | ||
| List<Result> currentVersionResults = new ArrayList<Result>(); | ||
| List<Mutation> mutations = new ArrayList<>(); | ||
| while (scannerIterator.hasNext()) { | ||
| Result result = scannerIterator.next(); | ||
| currentVersionResults.add(result); | ||
| statements.add(generateStatement(Bytes.toString(result.getRow()), 0L, rollbackTime - 1)); | ||
|
|
||
| if (scannerIterator.hasNext() && statements.size() < multiGetBatchSize) { | ||
| continue; | ||
| } | ||
| Result[] lastVersionResults = hTable.get(statements); | ||
| for (int i = 0; i < lastVersionResults.length; i++) { | ||
| Result lastVersionResult = lastVersionResults[i]; | ||
| if (null == lastVersionResult.getRow() && rollbackSync) { | ||
| Result currentVersionResult = currentVersionResults.get(i); | ||
| Delete delete = new Delete(currentVersionResult.getRow()); | ||
| mutations.add(delete); | ||
| } | ||
|
|
||
| if (null != lastVersionResult.getRow()) { | ||
| String oldPath = new String(lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN)); | ||
| String nowPath = new String(currentVersionResults.get(i).getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN)); | ||
| if (!oldPath.equals(nowPath) || rollbackSync) { | ||
| Put put = new Put(lastVersionResult.getRow()); | ||
| put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN)); | ||
| put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN)); | ||
| put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN)); | ||
| mutations.add(put); | ||
| } | ||
| } | ||
| } | ||
| doMutations(mutator, mutations, limiter); | ||
| currentVersionResults.clear(); | ||
| statements.clear(); | ||
| mutations.clear(); | ||
| } | ||
| } catch (Exception e) { | ||
| LOG.error("hbase index roll back failed", e); | ||
| return false; | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.