Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.index.hbase;

import avro.shaded.com.google.common.collect.Maps;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
Expand Down Expand Up @@ -307,6 +308,125 @@ public void testSimpleTagLocationAndUpdateWithRollback() throws Exception {
assertEquals(0, records3.stream().filter(record -> record.getCurrentLocation() != null).count());
}

/*
* Test case to verify that for taglocation entries present in HBase, if the corresponding commit instant is missing
* in timeline and the commit is not archived, taglocation would reset the current record location to null.
*/
@Test
public void testSimpleTagLocationWithInvalidCommit() throws Exception {
// Load to memory
HoodieWriteConfig config = getConfig();
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);

String newCommitTime = writeClient.startCommit();
// make a commit with 199 records
JavaRDD<HoodieRecord> writeRecords = generateAndCommitRecords(writeClient, 199);

// make a second commit with a single record
String invalidCommit = writeClient.startCommit();
JavaRDD<HoodieRecord> invalidWriteRecords = generateAndCommitRecords(writeClient, 1, invalidCommit);

// verify location is tagged.
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
JavaRDD<HoodieRecord> javaRDD0 = index.tagLocation(invalidWriteRecords, context(), hoodieTable);
assert (javaRDD0.collect().size() == 1); // one record present
assert (javaRDD0.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 1); // it is tagged
assert (javaRDD0.collect().get(0).getCurrentLocation().getInstantTime().equals(invalidCommit));

// rollback the invalid commit, so that hbase will be left with a stale entry.
writeClient.rollback(invalidCommit);

// Now tagLocation for the valid records, hbaseIndex should tag them
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieSparkTable.create(config, context, metaClient);
JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords, context(), hoodieTable);
assert (javaRDD1.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 199);

// tagLocation for the invalid record - commit is not present in timeline due to rollback.
JavaRDD<HoodieRecord> javaRDD2 = index.tagLocation(invalidWriteRecords, context(), hoodieTable);
assert (javaRDD2.collect().size() == 1); // one record present
assert (javaRDD2.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 0); // it is not tagged
}

/*
* Test case to verify that taglocation() uses the commit timeline to validate the commitTS stored in hbase.
* When CheckIfValidCommit() in HbaseIndex uses the incorrect timeline filtering, this test would fail.
*/
@Test
public void testEnsureTagLocationUsesCommitTimeline() throws Exception {
// Load to memory
HoodieWriteConfig config = getConfig();
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);

String commitTime1 = writeClient.startCommit();
JavaRDD<HoodieRecord> writeRecords1 = generateAndCommitRecords(writeClient, 20, commitTime1);

// rollback the commit - leaves a clean file in timeline.
writeClient.rollback(commitTime1);

// create a second commit with 20 records
metaClient = HoodieTableMetaClient.reload(metaClient);
generateAndCommitRecords(writeClient, 20);

// Now tagLocation for the first set of rolledback records, hbaseIndex should tag them
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords1, context(), hoodieTable);
assert (javaRDD1.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 20);
}

private JavaRDD<HoodieRecord> generateAndCommitRecords(SparkRDDWriteClient writeClient, int numRecs) throws Exception {
String commitTime = writeClient.startCommit();
return generateAndCommitRecords(writeClient, numRecs, commitTime);
}

private JavaRDD<HoodieRecord> generateAndCommitRecords(SparkRDDWriteClient writeClient,
int numRecs, String commitTime) throws Exception {
// first batch of records
List<HoodieRecord> records = dataGen.generateInserts(commitTime, numRecs);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
metaClient = HoodieTableMetaClient.reload(metaClient);

// Insert records
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, commitTime);
assertNoWriteErrors(writeStatues.collect());

// commit this upsert
writeClient.commit(commitTime, writeStatues);

return writeRecords;
}

// Verify hbase is tagging records belonging to an archived commit as valid.
@Test
public void testHbaseTagLocationForArchivedCommits() throws Exception {
// Load to memory
Map<String, String> params = Maps.newHashMap();
params.put(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP, "1");
params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP, "3");
params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP, "2");
HoodieWriteConfig config = getConfigBuilder(100, false).withProps(params).build();

SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);

// make first commit with 20 records
JavaRDD<HoodieRecord> writeRecords1 = generateAndCommitRecords(writeClient, 20);

// Make 3 additional commits, so that first commit is archived
for (int nCommit = 0; nCommit < 3; nCommit++) {
generateAndCommitRecords(writeClient, 20);
}

// tagLocation for the first set of records (for the archived commit), hbaseIndex should tag them as valid
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords1, context(), hoodieTable);
assertEquals(20, javaRDD1.filter(HoodieRecord::isCurrentLocationKnown).collect().size());
}

@Test
public void testTotalGetsBatching() throws Exception {
HoodieWriteConfig config = getConfig();
Expand Down