Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ object SparkHelpers {
def skipKeysAndWriteNewFile(commitTime: String, fs: FileSystem, sourceFile: Path, destinationFile: Path, keysToSkip: Set[String]) {
val sourceRecords = ParquetUtils.readAvroRecords(fs.getConf, sourceFile)
val schema: Schema = sourceRecords.get(0).getSchema
val filter: BloomFilter = new BloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble)
val filter: BloomFilter = new BloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt,
HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble, HoodieIndexConfig.DEFAULT_BLOOM_INDEX_ENABLE_DYNAMIC.toBoolean)
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter)
val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble)
val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](commitTime, destinationFile, parquetConfig, schema)
Expand Down Expand Up @@ -122,7 +123,7 @@ class SparkHelper(sqlContext: SQLContext, fs: FileSystem) {
*/
def fileKeysAgainstBF(conf: Configuration, sqlContext: SQLContext, file: String): Boolean = {
val bfStr = SparkHelpers.getBloomFilter(file, conf)
val bf = new com.uber.hoodie.common.BloomFilter(bfStr)
val bf = new com.uber.hoodie.common.BloomFilter(bfStr, HoodieIndexConfig.DEFAULT_BLOOM_INDEX_ENABLE_DYNAMIC.toBoolean)
val foundCount = sqlContext.parquetFile(file)
.select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`")
.collect().count(r => !bf.mightContain(r.getString(0)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL =
"hoodie.bloom.index.input.storage" + ".level";
public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
public static final String BLOOM_INDEX_ENABLE_DYNAMIC_PROP =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to "hoodie.bloom.index.dynamic.bloomfilter" or "hoodie.bloom.index.auto.tune.bloomfilter" ? to make it clearer it's about the bloom filter and not the index checking..

Also this should probably belong in storage config? since its about how we write the parquet file right

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we leave it in the IndexConfig since logically that's where people would look for all Index/BloomFilter related settings ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

depends on how you look at it.. At the code level, its weird to suddenly access an index config in storage.. we can leave it here for now. but let's rename?

"hoodie.bloom.index.dynamic";
public static final String DEFAULT_BLOOM_INDEX_ENABLE_DYNAMIC = "false";

private HoodieIndexConfig(Properties props) {
super(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ public StorageLevel getBloomIndexInputStorageLevel() {
.fromString(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL));
}

public boolean getEnableDynamicBloomIndex() {
return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_ENABLE_DYNAMIC_PROP));
}

/**
* storage properties
**/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ JavaPairRDD<String, String> findMatchingFilesForRecordKeys(
.sortByKey(true, joinParallelism);

return fileSortedTripletRDD.mapPartitionsWithIndex(
new HoodieBloomIndexCheckFunction(metaClient, config.getBasePath()), true)
new HoodieBloomIndexCheckFunction(metaClient, config), true)
.flatMap(List::iterator)
.filter(lookupResult -> lookupResult.getMatchingRecordKeys().size() > 0)
.flatMapToPair(lookupResult -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.HoodieTimer;
import com.uber.hoodie.common.util.ParquetUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIndexException;
import com.uber.hoodie.func.LazyIterableIterator;
Expand All @@ -48,13 +49,13 @@ public class HoodieBloomIndexCheckFunction implements

private static Logger logger = LogManager.getLogger(HoodieBloomIndexCheckFunction.class);

private final String basePath;
private final HoodieWriteConfig config;

private final HoodieTableMetaClient metaClient;

public HoodieBloomIndexCheckFunction(HoodieTableMetaClient metaClient, String basePath) {
public HoodieBloomIndexCheckFunction(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
this.metaClient = metaClient;
this.basePath = basePath;
this.config = config;
}

/**
Expand Down Expand Up @@ -118,9 +119,10 @@ protected void start() {

private void initState(String fileName, String partitionPath) throws HoodieIndexException {
try {
Path filePath = new Path(basePath + "/" + partitionPath + "/" + fileName);
Path filePath = new Path(config.getBasePath() + "/" + partitionPath + "/" + fileName);
HoodieTimer timer = new HoodieTimer().startTimer();
bloomFilter = ParquetUtils.readBloomFilterFromParquetMetadata(metaClient.getHadoopConf(), filePath);
bloomFilter = ParquetUtils.readBloomFilterFromParquetMetadata(metaClient.getHadoopConf(), filePath, config
.getEnableDynamicBloomIndex());
logger.info(String.format("Read bloom filter from %s/%s in %d ms", partitionPath, fileName, timer.endTimer()));
candidateRecordKeys = new ArrayList<>();
currentFile = fileName;
Expand All @@ -144,7 +146,7 @@ private void checkAndAddCandidates(String recordKey) {
}

private List<String> checkAgainstCurrentFile() {
Path filePath = new Path(basePath + "/" + currentPartitionPath + "/" + currentFile);
Path filePath = new Path(config.getBasePath() + "/" + currentPartitionPath + "/" + currentFile);
if (logger.isDebugEnabled()) {
logger.debug("#The candidate row keys for " + filePath + " => " + candidateRecordKeys);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieSto
R extends IndexedRecord> HoodieStorageWriter<R> newParquetStorageWriter(String commitTime, Path path,
HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable) throws IOException {
BloomFilter filter = new BloomFilter(config.getBloomFilterNumEntries(),
config.getBloomFilterFPP());
config.getBloomFilterFPP(), false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should n't we pass the config here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@n3nash without the config here, actually we would not have written dynamic filters at all during the tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran tests isolated only on the bloom filter and generated UUID's + random strings (to simulate non uuid based keys), the test don't go through this code path.

HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
new AvroSchemaConverter().convert(schema), schema, filter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public static String writeParquetFile(String basePath,
boolean createCommitTime) throws IOException {

if (filter == null) {
filter = new BloomFilter(10000, 0.0000001);
filter = new BloomFilter(10000, 0.0000001, false);
}
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema,
filter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public void testCheckUUIDsAgainstOneFile() throws IOException, InterruptedExcept

// We write record1, record2 to a parquet file, but the bloom filter contains (record1,
// record2, record3).
BloomFilter filter = new BloomFilter(10000, 0.0000001);
BloomFilter filter = new BloomFilter(10000, 0.0000001, false);
filter.add(record3.getRecordKey());
String filename = HoodieClientTestUtils
.writeParquetFile(basePath, "2016/01/31",
Expand Down Expand Up @@ -479,7 +479,7 @@ public void testBloomFilterFalseError() throws IOException, InterruptedException
HoodieRecord record2 = new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()),
rowChange2);

BloomFilter filter = new BloomFilter(10000, 0.0000001);
BloomFilter filter = new BloomFilter(10000, 0.0000001, false);
filter.add(record2.getRecordKey());
String filename = HoodieClientTestUtils
.writeParquetFile(basePath, "2016/01/31",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ public void testUpdateRecords() throws Exception {

// Read out the bloom filter and make sure filter can answer record exist or not
Path parquetFilePath = new Path(parquetFile.getAbsolutePath());
BloomFilter filter = ParquetUtils.readBloomFilterFromParquetMetadata(jsc.hadoopConfiguration(), parquetFilePath);
BloomFilter filter = ParquetUtils.readBloomFilterFromParquetMetadata(jsc.hadoopConfiguration(), parquetFilePath,
false);
for (HoodieRecord record : records) {
assertTrue(filter.mightContain(record.getRecordKey()));
}
Expand Down Expand Up @@ -211,7 +212,7 @@ public void testUpdateRecords() throws Exception {
// Check whether the record has been updated
Path updatedParquetFilePath = new Path(updatedParquetFile.getAbsolutePath());
BloomFilter updatedFilter = ParquetUtils.readBloomFilterFromParquetMetadata(jsc.hadoopConfiguration(),
updatedParquetFilePath);
updatedParquetFilePath, false);
for (HoodieRecord record : records) {
// No change to the _row_key
assertTrue(updatedFilter.mightContain(record.getRecordKey()));
Expand Down
53 changes: 41 additions & 12 deletions hoodie-common/src/main/java/com/uber/hoodie/common/BloomFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import org.apache.hadoop.util.hash.Hash;

/**
* A Bloom filter implementation built on top of {@link org.apache.hadoop.util.bloom.BloomFilter}.
* A Bloom filter implementation built on top of {@link org.apache.hadoop.util.bloom.BloomFilter} or
* {@link org.apache.hadoop.util.bloom.DynamicBloomFilter}.
*/
public class BloomFilter {

Expand All @@ -37,33 +38,49 @@ public class BloomFilter {
*/
public static final double LOG2_SQUARED = Math.log(2) * Math.log(2);

private org.apache.hadoop.util.bloom.BloomFilter filter = null;
private org.apache.hadoop.util.bloom.BloomFilter bloomFilter = null;

public BloomFilter(int numEntries, double errorRate) {
this(numEntries, errorRate, Hash.MURMUR_HASH);
private org.apache.hadoop.util.bloom.DynamicBloomFilter dynamicBloomFilter = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be curious to see which other projects use this.. if its easy to do that..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hbase -> https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/util/BloomFilter.html
Looked through Hbase code and they have implemented their own BloomFilters based on the algorithms in the above class.
Cassandra -> https://docs.datastax.com/en/cassandra/3.0/cassandra/operations/opsTuningBloomFilters.html
They don't support dynamic bloom filters, instead force rewriting the bloom filter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


private boolean isDynamic;

public BloomFilter(int numEntries, double errorRate, boolean isDynamic) {
this(numEntries, errorRate, Hash.MURMUR_HASH, isDynamic);
}

/**
* Create a new Bloom filter with the given configurations.
*/
public BloomFilter(int numEntries, double errorRate, int hashType) {
public BloomFilter(int numEntries, double errorRate, int hashType, boolean isDynamic) {
// Bit size
int bitSize = (int) Math.ceil(numEntries * (-Math.log(errorRate) / LOG2_SQUARED));
// Number of the hash functions
int numHashs = (int) Math.ceil(Math.log(2) * bitSize / numEntries);
// The filter
this.filter = new org.apache.hadoop.util.bloom.BloomFilter(bitSize, numHashs, hashType);
if (isDynamic) {
this.dynamicBloomFilter = new org.apache.hadoop.util.bloom.DynamicBloomFilter(bitSize, numHashs, hashType,
numEntries);
} else {
this.bloomFilter = new org.apache.hadoop.util.bloom.BloomFilter(bitSize, numHashs, hashType);
}
this.isDynamic = isDynamic;
}

/**
* Create the bloom filter from serialized string.
*/
public BloomFilter(String filterStr) {
this.filter = new org.apache.hadoop.util.bloom.BloomFilter();
public BloomFilter(String filterStr, boolean isDynamic) {
this.isDynamic = isDynamic;
byte[] bytes = DatatypeConverter.parseBase64Binary(filterStr);
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
try {
this.filter.readFields(dis);
if (isDynamic) {
this.dynamicBloomFilter = new org.apache.hadoop.util.bloom.DynamicBloomFilter();
this.dynamicBloomFilter.readFields(dis);
} else {
this.bloomFilter = new org.apache.hadoop.util.bloom.BloomFilter();
this.bloomFilter.readFields(dis);
}
dis.close();
} catch (IOException e) {
throw new HoodieIndexException("Could not deserialize BloomFilter instance", e);
Expand All @@ -74,14 +91,22 @@ public void add(String key) {
if (key == null) {
throw new NullPointerException("Key cannot by null");
}
filter.add(new Key(key.getBytes(StandardCharsets.UTF_8)));
if (isDynamic) {
this.dynamicBloomFilter.add(new Key(key.getBytes(StandardCharsets.UTF_8)));
} else {
this.bloomFilter.add(new Key(key.getBytes(StandardCharsets.UTF_8)));
}
}

public boolean mightContain(String key) {
if (key == null) {
throw new NullPointerException("Key cannot by null");
}
return filter.membershipTest(new Key(key.getBytes(StandardCharsets.UTF_8)));
if (isDynamic) {
return this.dynamicBloomFilter.membershipTest(new Key(key.getBytes(StandardCharsets.UTF_8)));
} else {
return this.bloomFilter.membershipTest(new Key(key.getBytes(StandardCharsets.UTF_8)));
}
}

/**
Expand All @@ -91,7 +116,11 @@ public String serializeToString() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
try {
filter.write(dos);
if (isDynamic) {
dynamicBloomFilter.write(dos);
} else {
bloomFilter.write(dos);
}
byte[] bytes = baos.toByteArray();
dos.close();
return DatatypeConverter.printBase64Binary(bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ public static Schema readAvroSchema(Configuration configuration, Path parquetFil
* Read out the bloom filter from the parquet file meta data.
*/
public static BloomFilter readBloomFilterFromParquetMetadata(Configuration configuration,
Path parquetFilePath) {
Path parquetFilePath, boolean enableDynamicBloomFilter) {
String footerVal = readParquetFooter(configuration, parquetFilePath,
HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY).get(0);
return new BloomFilter(footerVal);
return new BloomFilter(footerVal, enableDynamicBloomFilter);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am assuming this is needed coz only then we 'd know the type of bloom filter and deserialize properly?Can BloomFilter read a DynamicBloomFilter serialized value? what happens when we have a mix of files written using normal and dynamic filterS? should we resolve this using an additional footer, instead of making this a writer side config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have some thoughts around this, will discuss f2f

}

public static String[] readMinMaxRecordKeys(Configuration configuration, Path parquetFilePath) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.uber.hoodie.common;/*
* Copyright (c) 2019 Uber Technologies, Inc. ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import java.io.IOException;
import java.util.UUID;

public abstract class AbstractBloomFilter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename : AbstractBloomFilterTest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


protected void testAddKey(boolean isDynamic) {
BloomFilter filter = new BloomFilter(100, 0.0000001, isDynamic);
filter.add("key1");
assert (filter.mightContain("key1"));
}

protected int testOverloadBloom(boolean isDynamic) {
BloomFilter filter = new BloomFilter(10, 0.0000001, isDynamic);
for (int i = 0; i < 100; i++) {
filter.add(UUID.randomUUID().toString());
}
int falsePositives = 0;
for (int i = 0; i < 100; i++) {
if (filter.mightContain(UUID.randomUUID().toString())) {
falsePositives++;
}
}
return falsePositives;
}

protected void testSerialize(boolean isDynamic) throws IOException, ClassNotFoundException {
BloomFilter filter = new BloomFilter(1000, 0.0000001, isDynamic);
filter.add("key1");
filter.add("key2");
String filterStr = filter.serializeToString();

// Rebuild
BloomFilter newFilter = new BloomFilter(filterStr, isDynamic);
assert (newFilter.mightContain("key1"));
assert (newFilter.mightContain("key2"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,21 @@
import java.io.IOException;
import org.junit.Test;

public class TestBloomFilter {
public class TestBloomFilter extends AbstractBloomFilter {

@Test
public void testAddKey() {
BloomFilter filter = new BloomFilter(100, 0.0000001);
filter.add("key1");
assert (filter.mightContain("key1"));
testAddKey(false);
}

@Test
public void testSerialize() throws IOException, ClassNotFoundException {
BloomFilter filter = new BloomFilter(1000, 0.0000001);
filter.add("key1");
filter.add("key2");
String filterStr = filter.serializeToString();
public void testOverloadBloom() {
int falsePositives = testOverloadBloom(false);
assert (falsePositives > 0);
}

// Rebuild
BloomFilter newFilter = new BloomFilter(filterStr);
assert (newFilter.mightContain("key1"));
assert (newFilter.mightContain("key2"));
@Test
public void testSerialize() throws IOException, ClassNotFoundException {
testSerialize(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2019 Uber Technologies, Inc. ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.uber.hoodie.common;

import java.io.IOException;
import org.junit.Test;

public class TestDynamicBloomFilter extends AbstractBloomFilter {

@Test
public void testAddKey() {
testAddKey(true);
}

@Test
public void testOverloadBloom() {
int falsePositives = testOverloadBloom(true);
assert (falsePositives == 0);
}

@Test
public void testSerialize() throws IOException, ClassNotFoundException {
testSerialize(true);
}

}
Loading