diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java index 2eb672a00bd4c..6a7a95821673a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java @@ -18,6 +18,7 @@ package org.apache.hudi.index.hbase; +import avro.shaded.com.google.common.collect.Maps; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; @@ -307,6 +308,125 @@ public void testSimpleTagLocationAndUpdateWithRollback() throws Exception { assertEquals(0, records3.stream().filter(record -> record.getCurrentLocation() != null).count()); } + /* + * Test case to verify that for taglocation entries present in HBase, if the corresponding commit instant is missing + * in timeline and the commit is not archived, taglocation would reset the current record location to null. + */ + @Test + public void testSimpleTagLocationWithInvalidCommit() throws Exception { + // Load to memory + HoodieWriteConfig config = getConfig(); + SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config); + SparkRDDWriteClient writeClient = getHoodieWriteClient(config); + + String newCommitTime = writeClient.startCommit(); + // make a commit with 199 records + JavaRDD writeRecords = generateAndCommitRecords(writeClient, 199); + + // make a second commit with a single record + String invalidCommit = writeClient.startCommit(); + JavaRDD invalidWriteRecords = generateAndCommitRecords(writeClient, 1, invalidCommit); + + // verify location is tagged. + HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); + JavaRDD javaRDD0 = index.tagLocation(invalidWriteRecords, context(), hoodieTable); + assert (javaRDD0.collect().size() == 1); // one record present + assert (javaRDD0.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 1); // it is tagged + assert (javaRDD0.collect().get(0).getCurrentLocation().getInstantTime().equals(invalidCommit)); + + // rollback the invalid commit, so that hbase will be left with a stale entry. + writeClient.rollback(invalidCommit); + + // Now tagLocation for the valid records, hbaseIndex should tag them + metaClient = HoodieTableMetaClient.reload(metaClient); + hoodieTable = HoodieSparkTable.create(config, context, metaClient); + JavaRDD javaRDD1 = index.tagLocation(writeRecords, context(), hoodieTable); + assert (javaRDD1.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 199); + + // tagLocation for the invalid record - commit is not present in timeline due to rollback. + JavaRDD javaRDD2 = index.tagLocation(invalidWriteRecords, context(), hoodieTable); + assert (javaRDD2.collect().size() == 1); // one record present + assert (javaRDD2.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 0); // it is not tagged + } + + /* + * Test case to verify that taglocation() uses the commit timeline to validate the commitTS stored in hbase. + * When CheckIfValidCommit() in HbaseIndex uses the incorrect timeline filtering, this test would fail. + */ + @Test + public void testEnsureTagLocationUsesCommitTimeline() throws Exception { + // Load to memory + HoodieWriteConfig config = getConfig(); + SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config); + SparkRDDWriteClient writeClient = getHoodieWriteClient(config); + + String commitTime1 = writeClient.startCommit(); + JavaRDD writeRecords1 = generateAndCommitRecords(writeClient, 20, commitTime1); + + // rollback the commit - leaves a clean file in timeline. + writeClient.rollback(commitTime1); + + // create a second commit with 20 records + metaClient = HoodieTableMetaClient.reload(metaClient); + generateAndCommitRecords(writeClient, 20); + + // Now tagLocation for the first set of rolledback records, hbaseIndex should tag them + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); + JavaRDD javaRDD1 = index.tagLocation(writeRecords1, context(), hoodieTable); + assert (javaRDD1.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 20); + } + + private JavaRDD generateAndCommitRecords(SparkRDDWriteClient writeClient, int numRecs) throws Exception { + String commitTime = writeClient.startCommit(); + return generateAndCommitRecords(writeClient, numRecs, commitTime); + } + + private JavaRDD generateAndCommitRecords(SparkRDDWriteClient writeClient, + int numRecs, String commitTime) throws Exception { + // first batch of records + List records = dataGen.generateInserts(commitTime, numRecs); + JavaRDD writeRecords = jsc().parallelize(records, 1); + metaClient = HoodieTableMetaClient.reload(metaClient); + + // Insert records + JavaRDD writeStatues = writeClient.upsert(writeRecords, commitTime); + assertNoWriteErrors(writeStatues.collect()); + + // commit this upsert + writeClient.commit(commitTime, writeStatues); + + return writeRecords; + } + + // Verify hbase is tagging records belonging to an archived commit as valid. + @Test + public void testHbaseTagLocationForArchivedCommits() throws Exception { + // Load to memory + Map params = Maps.newHashMap(); + params.put(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP, "1"); + params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP, "3"); + params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP, "2"); + HoodieWriteConfig config = getConfigBuilder(100, false).withProps(params).build(); + + SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config); + SparkRDDWriteClient writeClient = getHoodieWriteClient(config); + + // make first commit with 20 records + JavaRDD writeRecords1 = generateAndCommitRecords(writeClient, 20); + + // Make 3 additional commits, so that first commit is archived + for (int nCommit = 0; nCommit < 3; nCommit++) { + generateAndCommitRecords(writeClient, 20); + } + + // tagLocation for the first set of records (for the archived commit), hbaseIndex should tag them as valid + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); + JavaRDD javaRDD1 = index.tagLocation(writeRecords1, context(), hoodieTable); + assertEquals(20, javaRDD1.filter(HoodieRecord::isCurrentLocationKnown).collect().size()); + } + @Test public void testTotalGetsBatching() throws Exception { HoodieWriteConfig config = getConfig();