diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java index 5d79776b19b83..7a8d9535d48d7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java @@ -113,6 +113,13 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig { public static final String HBASE_INDEX_UPDATE_PARTITION_PATH = "hoodie.hbase.index.update.partition.path"; public static final Boolean DEFAULT_HBASE_INDEX_UPDATE_PARTITION_PATH = false; + /** + * When set to true, the rollback method will delete the last failed task index . + * The default value is false. Because deleting the index will add extra load on the Hbase cluster for each rollback. + */ + public static final String HBASE_INDEX_ROLLBACK_SYNC = "hoodie.index.hbase.rollback.sync"; + public static final Boolean DEFAULT_HBASE_INDEX_ROLLBACK_SYNC = false; + public HoodieHBaseIndexConfig(final Properties props) { super(props); } @@ -212,6 +219,11 @@ public Builder hbaseIndexUpdatePartitionPath(boolean updatePartitionPath) { return this; } + public Builder hbaseIndexRollbackSync(boolean rollbackSync) { + props.setProperty(HBASE_INDEX_ROLLBACK_SYNC, String.valueOf(rollbackSync)); + return this; + } + public Builder withQPSResourceAllocatorType(String qpsResourceAllocatorClass) { props.setProperty(HBASE_INDEX_QPS_ALLOCATOR_CLASS, qpsResourceAllocatorClass); return this; @@ -277,6 +289,8 @@ public HoodieHBaseIndexConfig build() { String.valueOf(DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS)); setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_UPDATE_PARTITION_PATH), HBASE_INDEX_UPDATE_PARTITION_PATH, String.valueOf(DEFAULT_HBASE_INDEX_UPDATE_PARTITION_PATH)); + setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_ROLLBACK_SYNC), HBASE_INDEX_ROLLBACK_SYNC, + String.valueOf(DEFAULT_HBASE_INDEX_ROLLBACK_SYNC)); return config; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 24ba10954633d..ee0e3fa3a6287 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -557,6 +557,10 @@ public int getHbaseIndexGetBatchSize() { return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_GET_BATCH_SIZE_PROP)); } + public Boolean getHBaseIndexRollbackSync() { + return Boolean.parseBoolean(props.getProperty(HoodieHBaseIndexConfig.HBASE_INDEX_ROLLBACK_SYNC)); + } + public int getHbaseIndexPutBatchSize() { return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_PROP)); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java index 22ee65cc302f5..c55e5e7c94157 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java @@ -18,6 +18,8 @@ package org.apache.hudi.index.hbase; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.utils.SparkMemoryUtils; @@ -28,6 +30,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.RateLimiter; @@ -67,6 +70,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -181,6 +185,10 @@ private Get generateStatement(String key) throws IOException { .addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN).addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN); } + private Get generateStatement(String key, long startTime, long endTime) throws IOException { + return generateStatement(key).setTimeRange(startTime, endTime); + } + private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String commitTs) { HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants(); // Check if the last commit ts for this row is 1) present in the timeline or @@ -537,7 +545,72 @@ private Integer getNumRegionServersAliveForTable() { @Override public boolean rollbackCommit(String instantTime) { - // Rollback in HbaseIndex is managed via method {@link #checkIfValidCommit()} + int multiGetBatchSize = config.getHbaseIndexGetBatchSize(); + boolean rollbackSync = config.getHBaseIndexRollbackSync(); + + if (!config.getHBaseIndexRollbackSync()) { + // Default Rollback in HbaseIndex is managed via method {@link #checkIfValidCommit()} + return true; + } + + synchronized (SparkHoodieHBaseIndex.class) { + if (hbaseConnection == null || hbaseConnection.isClosed()) { + hbaseConnection = getHBaseConnection(); + } + } + try (HTable hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName)); + BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) { + final RateLimiter limiter = RateLimiter.create(multiPutBatchSize, TimeUnit.SECONDS); + + Long rollbackTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse(instantTime).getTime(); + Long currentTime = new Date().getTime(); + Scan scan = new Scan(); + scan.addFamily(SYSTEM_COLUMN_FAMILY); + scan.setTimeRange(rollbackTime, currentTime); + ResultScanner scanner = hTable.getScanner(scan); + Iterator scannerIterator = scanner.iterator(); + + List statements = new ArrayList<>(); + List currentVersionResults = new ArrayList(); + List mutations = new ArrayList<>(); + while (scannerIterator.hasNext()) { + Result result = scannerIterator.next(); + currentVersionResults.add(result); + statements.add(generateStatement(Bytes.toString(result.getRow()), 0L, rollbackTime - 1)); + + if (scannerIterator.hasNext() && statements.size() < multiGetBatchSize) { + continue; + } + Result[] lastVersionResults = hTable.get(statements); + for (int i = 0; i < lastVersionResults.length; i++) { + Result lastVersionResult = lastVersionResults[i]; + if (null == lastVersionResult.getRow() && rollbackSync) { + Result currentVersionResult = currentVersionResults.get(i); + Delete delete = new Delete(currentVersionResult.getRow()); + mutations.add(delete); + } + + if (null != lastVersionResult.getRow()) { + String oldPath = new String(lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN)); + String nowPath = new String(currentVersionResults.get(i).getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN)); + if (!oldPath.equals(nowPath) || rollbackSync) { + Put put = new Put(lastVersionResult.getRow()); + put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN)); + put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN)); + put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN)); + mutations.add(put); + } + } + } + doMutations(mutator, mutations, limiter); + currentVersionResults.clear(); + statements.clear(); + mutations.clear(); + } + } catch (Exception e) { + LOG.error("hbase index roll back failed", e); + return false; + } return true; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java index c99b79c900a17..31ad241a3d272 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java @@ -117,7 +117,7 @@ public static void init() throws Exception { utility = new HBaseTestingUtility(hbaseConfig); utility.startMiniCluster(); hbaseConfig = utility.getConnection().getConfiguration(); - utility.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes("_s")); + utility.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes("_s"),2); } @BeforeEach @@ -198,8 +198,8 @@ public void testTagLocationAndPartitionPathUpdate() throws Exception { JavaRDD newWriteRecords = jsc().parallelize(newRecords, 1); JavaRDD oldWriteRecords = jsc().parallelize(oldRecords, 1); - HoodieWriteConfig config = getConfig(true); - SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(getConfig(true)); + HoodieWriteConfig config = getConfig(true, false); + SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(getConfig(true, false)); try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) { // allowed path change test @@ -225,7 +225,7 @@ public void testTagLocationAndPartitionPathUpdate() throws Exception { assertEquals(numRecords, taggedRecords.stream().filter(record -> !record.getKey().getPartitionPath().equals(oldPartitionPath)).count()); // not allowed path change test - index = new SparkHoodieHBaseIndex<>(getConfig(false)); + index = new SparkHoodieHBaseIndex<>(getConfig(false, false)); List notAllowPathChangeRecords = index.tagLocation(newWriteRecords, context, hoodieTable).collect(); assertEquals(numRecords, notAllowPathChangeRecords.stream().count()); assertEquals(numRecords, taggedRecords.stream().filter(hoodieRecord -> hoodieRecord.isCurrentLocationKnown() @@ -272,6 +272,66 @@ public void testTagLocationAndDuplicateUpdate() throws Exception { && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count()); } + @Test + public void testTagLocationAndPartitionPathUpdateWithExplicitRollback() throws Exception { + final int numRecords = 10; + final String oldPartitionPath = "1970/01/01"; + final String emptyHoodieRecordPayloadClasssName = EmptyHoodieRecordPayload.class.getName(); + HoodieWriteConfig config = getConfig(true, true); + SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config); + + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) { + final String firstCommitTime = writeClient.startCommit(); + List newRecords = dataGen.generateInserts(firstCommitTime, numRecords); + List oldRecords = new LinkedList(); + for (HoodieRecord newRecord: newRecords) { + HoodieKey key = new HoodieKey(newRecord.getRecordKey(), oldPartitionPath); + HoodieRecord hoodieRecord = new HoodieRecord(key, newRecord.getData()); + oldRecords.add(hoodieRecord); + } + JavaRDD newWriteRecords = jsc().parallelize(newRecords, 1); + JavaRDD oldWriteRecords = jsc().parallelize(oldRecords, 1); + // first commit old record + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); + List beforeFirstTaggedRecords = index.tagLocation(oldWriteRecords, context, hoodieTable).collect(); + JavaRDD oldWriteStatues = writeClient.upsert(oldWriteRecords, firstCommitTime); + index.updateLocation(oldWriteStatues, context, hoodieTable); + writeClient.commit(firstCommitTime, oldWriteStatues); + List afterFirstTaggedRecords = index.tagLocation(oldWriteRecords, context, hoodieTable).collect(); + + metaClient = HoodieTableMetaClient.reload(metaClient); + hoodieTable = HoodieSparkTable.create(config, context, metaClient); + final String secondCommitTime = writeClient.startCommit(); + List beforeSecondTaggedRecords = index.tagLocation(newWriteRecords, context, hoodieTable).collect(); + JavaRDD newWriteStatues = writeClient.upsert(newWriteRecords, secondCommitTime); + index.updateLocation(newWriteStatues, context, hoodieTable); + writeClient.commit(secondCommitTime, newWriteStatues); + List afterSecondTaggedRecords = index.tagLocation(newWriteRecords, context, hoodieTable).collect(); + writeClient.rollback(secondCommitTime); + List afterRollback = index.tagLocation(newWriteRecords, context, hoodieTable).collect(); + + // Verify the first commit + assertEquals(numRecords, beforeFirstTaggedRecords.stream().filter(record -> record.getCurrentLocation() == null).count()); + assertEquals(numRecords, afterFirstTaggedRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).count()); + // Verify the second commit + assertEquals(numRecords, beforeSecondTaggedRecords.stream() + .filter(record -> record.getKey().getPartitionPath().equals(oldPartitionPath) + && record.getData().getClass().getName().equals(emptyHoodieRecordPayloadClasssName)).count()); + assertEquals(numRecords * 2, beforeSecondTaggedRecords.stream().count()); + assertEquals(numRecords, afterSecondTaggedRecords.stream().count()); + assertEquals(numRecords, afterSecondTaggedRecords.stream().filter(record -> !record.getKey().getPartitionPath().equals(oldPartitionPath)).count()); + // Verify the rollback + // If an exception occurs after hbase writes the index and the index does not roll back, + // the currentLocation information will not be returned. + assertEquals(numRecords, afterRollback.stream().filter(record -> record.getKey().getPartitionPath().equals(oldPartitionPath) + && record.getData().getClass().getName().equals(emptyHoodieRecordPayloadClasssName)).count()); + assertEquals(numRecords * 2, beforeSecondTaggedRecords.stream().count()); + assertEquals(numRecords, afterRollback.stream().filter(HoodieRecord::isCurrentLocationKnown) + .filter(record -> record.getCurrentLocation().getInstantTime().equals(firstCommitTime)).count()); + } + } + @Test public void testSimpleTagLocationAndUpdateWithRollback() throws Exception { // Load to memory @@ -413,7 +473,7 @@ public void testHbaseTagLocationForArchivedCommits() throws Exception { params.put(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP, "1"); params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP, "3"); params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP, "2"); - HoodieWriteConfig config = getConfigBuilder(100, false).withProps(params).build(); + HoodieWriteConfig config = getConfigBuilder(100, false, false).withProps(params).build(); SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config); SparkRDDWriteClient writeClient = getHoodieWriteClient(config); @@ -723,18 +783,18 @@ private WriteStatus getSampleWriteStatus(final int numInserts, final int numUpda } private HoodieWriteConfig getConfig() { - return getConfigBuilder(100, false).build(); + return getConfigBuilder(100, false, false).build(); } private HoodieWriteConfig getConfig(int hbaseIndexBatchSize) { - return getConfigBuilder(hbaseIndexBatchSize, false).build(); + return getConfigBuilder(hbaseIndexBatchSize, false, false).build(); } - private HoodieWriteConfig getConfig(boolean updatePartitionPath) { - return getConfigBuilder(100, updatePartitionPath).build(); + private HoodieWriteConfig getConfig(boolean updatePartitionPath, boolean rollbackSync) { + return getConfigBuilder(100, updatePartitionPath, rollbackSync).build(); } - private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize, boolean updatePartitionPath) { + private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize, boolean updatePartitionPath, boolean rollbackSync) { return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(1, 1).withDeleteParallelism(1) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024) @@ -749,6 +809,7 @@ private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize, bool .hbaseZkZnodeParent(hbaseConfig.get("zookeeper.znode.parent", "")) .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(TABLE_NAME) .hbaseIndexUpdatePartitionPath(updatePartitionPath) + .hbaseIndexRollbackSync(rollbackSync) .hbaseIndexGetBatchSize(hbaseIndexBatchSize).build()) .build()); }