Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
Expand All @@ -61,12 +62,14 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import scala.Tuple2;

import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.genPseudoRandomUUID;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -78,15 +81,21 @@
public class TestHoodieBloomIndex extends TestHoodieMetadataBase {

private static final Schema SCHEMA = getSchemaFromResource(TestHoodieBloomIndex.class, "/exampleSchema.avsc", true);
private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with rangePruning={0}, treeFiltering={1}, bucketizedChecking={2}";
private static final String TEST_NAME_WITH_PARAMS =
"[{index}] Test with rangePruning={0}, treeFiltering={1}, bucketizedChecking={2}, useMetadataTable={3}";
private static final Random RANDOM = new Random(0xDEED);

public static Stream<Arguments> configParams() {
// rangePruning, treeFiltering, bucketizedChecking
// rangePruning, treeFiltering, bucketizedChecking, useMetadataTable
Object[][] data = new Object[][] {
{true, true, true},
{false, true, true},
{true, true, false},
{true, false, true}
{true, true, true, false},
{false, true, true, false},
{true, true, false, false},
{true, false, true, false},
{true, true, true, true},
{false, true, true, true},
{true, true, false, true},
{true, false, true, true}
};
return Stream.of(data).map(Arguments::of);
}
Expand All @@ -110,24 +119,39 @@ public void tearDown() throws Exception {
cleanupResources();
}

private HoodieWriteConfig makeConfig(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) {
private HoodieWriteConfig makeConfig(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking, boolean useMetadataTable) {
// For the bloom index to use column stats and bloom filters from metadata table,
// the following configs must be set to true:
// "hoodie.bloom.index.use.metadata"
// "hoodie.metadata.enable" (by default is true)
// "hoodie.metadata.index.column.stats.enable"
// "hoodie.metadata.index.bloom.filter.enable"
return HoodieWriteConfig.newBuilder().withPath(basePath)
.withIndexConfig(HoodieIndexConfig.newBuilder().bloomIndexPruneByRanges(rangePruning)
.bloomIndexTreebasedFilter(treeFiltering).bloomIndexBucketizedChecking(bucketizedChecking)
.bloomIndexKeysPerBucket(2).build())
.withIndexConfig(HoodieIndexConfig.newBuilder()
.bloomIndexPruneByRanges(rangePruning)
.bloomIndexTreebasedFilter(treeFiltering)
.bloomIndexBucketizedChecking(bucketizedChecking)
.bloomIndexKeysPerBucket(2)
.bloomIndexUseMetadata(useMetadataTable)
.build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMetadataIndexBloomFilter(false)
.withMetadataIndexColumnStats(false)
.withMetadataIndexBloomFilter(useMetadataTable)
.withMetadataIndexColumnStats(useMetadataTable)
.build())
.build();
}

@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
@MethodSource("configParams")
public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception {
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
public void testLoadInvolvedFiles(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
boolean useMetadataTable) throws Exception {
HoodieWriteConfig config =
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
HoodieBloomIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter);

// Create some partitions, and put some files
Expand Down Expand Up @@ -218,8 +242,11 @@ public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, b

@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
@MethodSource("configParams")
public void testRangePruning(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) {
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
public void testRangePruning(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
boolean useMetadataTable) {
HoodieWriteConfig config =
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
HoodieBloomIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());

final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo = new HashMap<>();
Expand Down Expand Up @@ -279,7 +306,7 @@ public void testCheckUUIDsAgainstOneFile() throws Exception {

final Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap = new HashMap<>();
final String commitTime = "0000001";
final String fileId = UUID.randomUUID().toString();
final String fileId = genRandomUUID();

Path baseFilePath = testTable.forCommit(commitTime)
.withInserts(partition, fileId, Arrays.asList(record1, record2));
Expand Down Expand Up @@ -317,11 +344,14 @@ public void testCheckUUIDsAgainstOneFile() throws Exception {

@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
@MethodSource("configParams")
public void testTagLocationWithEmptyRDD(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) {
public void testTagLocationWithEmptyRDD(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
boolean useMetadataTable) {
// We have some records to be tagged (two different partitions)
JavaRDD<HoodieRecord> recordRDD = jsc.emptyRDD();
// Also create the metadata and config
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
HoodieWriteConfig config =
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieSparkTable table = HoodieSparkTable.create(config, context, metaClient);

Expand All @@ -335,11 +365,13 @@ public void testTagLocationWithEmptyRDD(boolean rangePruning, boolean treeFilter

@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
@MethodSource("configParams")
public void testTagLocation(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception {
public void testTagLocationOnPartitionedTable(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
boolean useMetadataTable) throws Exception {
// We have some records to be tagged (two different partitions)
String rowKey1 = UUID.randomUUID().toString();
String rowKey2 = UUID.randomUUID().toString();
String rowKey3 = UUID.randomUUID().toString();
String rowKey1 = genRandomUUID();
String rowKey2 = genRandomUUID();
String rowKey3 = genRandomUUID();
String recordStr1 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
String recordStr2 = "{\"_row_key\":\"" + rowKey2 + "\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
String recordStr3 = "{\"_row_key\":\"" + rowKey3 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
Expand All @@ -360,8 +392,9 @@ public void testTagLocation(boolean rangePruning, boolean treeFiltering, boolean
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4));

// Also create the metadata and config
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
HoodieSparkTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter);

// Let's tag
Expand All @@ -378,7 +411,7 @@ public void testTagLocation(boolean rangePruning, boolean treeFiltering, boolean
final String partition2 = "2015/01/31";

// We create three parquet file, each having one record. (two different partitions)
final String fileId1 = UUID.randomUUID().toString();
final String fileId1 = genRandomUUID();
final String commit1 = "0000001";
Path baseFilePath = testTable.forCommit(commit1).withInserts(partition1, fileId1, Collections.singletonList(record1));
long baseFileLength = fs.getFileStatus(baseFilePath).getLen();
Expand All @@ -387,7 +420,7 @@ public void testTagLocation(boolean rangePruning, boolean treeFiltering, boolean
testTable.doWriteOperation(commit1, WriteOperationType.UPSERT, Collections.singletonList(partition1),
partitionToFilesNameLengthMap, false, false);

final String fileId2 = UUID.randomUUID().toString();
final String fileId2 = genRandomUUID();
final String commit2 = "0000002";
baseFilePath = testTable.forCommit(commit2).withInserts(partition1, fileId2, Collections.singletonList(record2));
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
Expand All @@ -397,7 +430,7 @@ public void testTagLocation(boolean rangePruning, boolean treeFiltering, boolean
testTable.doWriteOperation(commit2, WriteOperationType.UPSERT, Collections.singletonList(partition1),
partitionToFilesNameLengthMap, false, false);

final String fileId3 = UUID.randomUUID().toString();
final String fileId3 = genRandomUUID();
final String commit3 = "0000003";
baseFilePath = testTable.forCommit(commit3).withInserts(partition2, fileId3, Collections.singletonList(record4));
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
Expand All @@ -408,6 +441,7 @@ public void testTagLocation(boolean rangePruning, boolean treeFiltering, boolean
partitionToFilesNameLengthMap, false, false);

// We do the tag again
metaClient = HoodieTableMetaClient.reload(metaClient);
taggedRecordRDD = tagLocation(bloomIndex, recordRDD, HoodieSparkTable.create(config, context, metaClient));

// Check results
Expand All @@ -428,7 +462,99 @@ public void testTagLocation(boolean rangePruning, boolean treeFiltering, boolean

@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
@MethodSource("configParams")
public void testCheckExists(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception {
public void testTagLocationOnNonpartitionedTable(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
boolean useMetadataTable) throws Exception {
// We have some records to be tagged (two different partitions)
String rowKey1 = genRandomUUID();
String rowKey2 = genRandomUUID();
String rowKey3 = genRandomUUID();
String recordStr1 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
String recordStr2 = "{\"_row_key\":\"" + rowKey2 + "\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
String recordStr3 = "{\"_row_key\":\"" + rowKey3 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";

String emptyPartitionPath = "";
RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
HoodieRecord record1 =
new HoodieAvroRecord(new HoodieKey(rowChange1.getRowKey(), emptyPartitionPath), rowChange1);
RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2);
HoodieRecord record2 =
new HoodieAvroRecord(new HoodieKey(rowChange2.getRowKey(), emptyPartitionPath), rowChange2);
RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3);
HoodieRecord record3 =
new HoodieAvroRecord(new HoodieKey(rowChange3.getRowKey(), emptyPartitionPath), rowChange3);

JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3));

// Also create the metadata and config
HoodieWriteConfig config =
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
HoodieSparkTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter);

// Let's tag
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
JavaRDD<HoodieRecord> taggedRecordRDD = tagLocation(bloomIndex, recordRDD, hoodieTable);

// Should not find any files
for (HoodieRecord record : taggedRecordRDD.collect()) {
assertFalse(record.isCurrentLocationKnown());
}

final Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap = new HashMap<>();

// We create three parquet file, each having one record
final String fileId1 = genRandomUUID();
final String commit1 = "0000001";
Path baseFilePath = testTable.forCommit(commit1).withInserts(emptyPartitionPath, fileId1, Collections.singletonList(record1));
long baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.computeIfAbsent(emptyPartitionPath,
k -> new ArrayList<>()).add(Pair.of(fileId1, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(commit1, WriteOperationType.UPSERT, Collections.singletonList(emptyPartitionPath),
partitionToFilesNameLengthMap, false, false);

final String fileId2 = genRandomUUID();
final String commit2 = "0000002";
baseFilePath = testTable.forCommit(commit2).withInserts(emptyPartitionPath, fileId2, Collections.singletonList(record2));
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.clear();
partitionToFilesNameLengthMap.computeIfAbsent(emptyPartitionPath,
k -> new ArrayList<>()).add(Pair.of(fileId2, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(commit2, WriteOperationType.UPSERT, Collections.singletonList(emptyPartitionPath),
partitionToFilesNameLengthMap, false, false);

final String fileId3 = UUID.randomUUID().toString();
final String commit3 = "0000003";
baseFilePath = testTable.forCommit(commit3).withInserts(emptyPartitionPath, fileId3, Collections.singletonList(record3));
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.clear();
partitionToFilesNameLengthMap.computeIfAbsent(emptyPartitionPath,
k -> new ArrayList<>()).add(Pair.of(fileId3, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(commit3, WriteOperationType.UPSERT, Collections.singletonList(emptyPartitionPath),
partitionToFilesNameLengthMap, false, false);

// We do the tag again
metaClient = HoodieTableMetaClient.reload(metaClient);
taggedRecordRDD = tagLocation(bloomIndex, recordRDD, HoodieSparkTable.create(config, context, metaClient));

// Check results
for (HoodieRecord record : taggedRecordRDD.collect()) {
if (record.getRecordKey().equals(rowKey1)) {
assertEquals(record.getCurrentLocation().getFileId(), fileId1);
} else if (record.getRecordKey().equals(rowKey2)) {
assertEquals(record.getCurrentLocation().getFileId(), fileId2);
} else if (record.getRecordKey().equals(rowKey3)) {
assertEquals(record.getCurrentLocation().getFileId(), fileId3);
}
}
}

@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
@MethodSource("configParams")
public void testCheckExists(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
boolean useMetadataTable) throws Exception {
// We have some records to be tagged (two different partitions)

String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
Expand All @@ -454,8 +580,10 @@ public void testCheckExists(boolean rangePruning, boolean treeFiltering, boolean
JavaRDD<HoodieKey> keysRDD = jsc.parallelize(Arrays.asList(key1, key2, key3, key4));

// Also create the metadata and config
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
HoodieWriteConfig config =
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter);

// Let's tag
Expand All @@ -475,9 +603,9 @@ public void testCheckExists(boolean rangePruning, boolean treeFiltering, boolean

final String partition1 = "2016/01/31";
final String partition2 = "2015/01/31";
final String fileId1 = UUID.randomUUID().toString();
final String fileId2 = UUID.randomUUID().toString();
final String fileId3 = UUID.randomUUID().toString();
final String fileId1 = genRandomUUID();
final String fileId2 = genRandomUUID();
final String fileId3 = genRandomUUID();
final Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap = new HashMap<>();
// We create three parquet file, each having one record. (two different partitions)
final String commit1 = "0000001";
Expand Down Expand Up @@ -536,7 +664,9 @@ public void testCheckExists(boolean rangePruning, boolean treeFiltering, boolean

@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
@MethodSource("configParams")
public void testBloomFilterFalseError(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception {
public void testBloomFilterFalseError(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
boolean useMetadataTable) throws Exception {
// We have two hoodie records
String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
Expand All @@ -561,7 +691,8 @@ public void testBloomFilterFalseError(boolean rangePruning, boolean treeFilterin

// We do the tag
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2));
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
HoodieWriteConfig config =
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(config, context, metaClient);

Expand All @@ -577,4 +708,8 @@ public void testBloomFilterFalseError(boolean rangePruning, boolean treeFilterin
}
}
}

private static String genRandomUUID() {
return genPseudoRandomUUID(RANDOM).toString();
}
}
Loading