diff --git a/hoodie-cli/src/main/scala/com/uber/hoodie/cli/SparkHelpers.scala b/hoodie-cli/src/main/scala/com/uber/hoodie/cli/SparkHelpers.scala index 81db77053274d..fe0e6719f84d5 100644 --- a/hoodie-cli/src/main/scala/com/uber/hoodie/cli/SparkHelpers.scala +++ b/hoodie-cli/src/main/scala/com/uber/hoodie/cli/SparkHelpers.scala @@ -40,7 +40,8 @@ object SparkHelpers { def skipKeysAndWriteNewFile(commitTime: String, fs: FileSystem, sourceFile: Path, destinationFile: Path, keysToSkip: Set[String]) { val sourceRecords = ParquetUtils.readAvroRecords(fs.getConf, sourceFile) val schema: Schema = sourceRecords.get(0).getSchema - val filter: BloomFilter = new BloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble) + val filter: BloomFilter = new BloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, + HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble, HoodieIndexConfig.DEFAULT_BLOOM_INDEX_ENABLE_DYNAMIC.toBoolean) val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter) val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble) val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](commitTime, destinationFile, parquetConfig, schema) @@ -122,7 +123,7 @@ class SparkHelper(sqlContext: SQLContext, fs: FileSystem) { */ def fileKeysAgainstBF(conf: Configuration, sqlContext: SQLContext, file: String): Boolean = { val bfStr = SparkHelpers.getBloomFilter(file, conf) - val bf = new com.uber.hoodie.common.BloomFilter(bfStr) + val bf = new com.uber.hoodie.common.BloomFilter(bfStr, HoodieIndexConfig.DEFAULT_BLOOM_INDEX_ENABLE_DYNAMIC.toBoolean) val foundCount = sqlContext.parquetFile(file) .select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`") .collect().count(r => !bf.mightContain(r.getString(0))) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java index 3e4772a7d56d7..b60e05e803c7b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java @@ -50,6 +50,9 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL = "hoodie.bloom.index.input.storage" + ".level"; public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER"; + public static final String BLOOM_INDEX_ENABLE_DYNAMIC_PROP = + "hoodie.bloom.index.dynamic"; + public static final String DEFAULT_BLOOM_INDEX_ENABLE_DYNAMIC = "false"; private HoodieIndexConfig(Properties props) { super(props); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index ab6570f317ca2..92fc94c611ceb 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -333,6 +333,10 @@ public StorageLevel getBloomIndexInputStorageLevel() { .fromString(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL)); } + public boolean getEnableDynamicBloomIndex() { + return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_ENABLE_DYNAMIC_PROP)); + } + /** * storage properties **/ diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java index 9ac8db0b8f348..0dbf4d669aa78 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java @@ -342,7 +342,7 @@ JavaPairRDD findMatchingFilesForRecordKeys( .sortByKey(true, joinParallelism); return fileSortedTripletRDD.mapPartitionsWithIndex( - new HoodieBloomIndexCheckFunction(metaClient, config.getBasePath()), true) + new HoodieBloomIndexCheckFunction(metaClient, config), true) .flatMap(List::iterator) .filter(lookupResult -> lookupResult.getMatchingRecordKeys().size() > 0) .flatMapToPair(lookupResult -> { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java index a313d2d91a623..59b46160a7717 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java @@ -23,6 +23,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.util.HoodieTimer; import com.uber.hoodie.common.util.ParquetUtils; +import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIndexException; import com.uber.hoodie.func.LazyIterableIterator; @@ -48,13 +49,13 @@ public class HoodieBloomIndexCheckFunction implements private static Logger logger = LogManager.getLogger(HoodieBloomIndexCheckFunction.class); - private final String basePath; + private final HoodieWriteConfig config; private final HoodieTableMetaClient metaClient; - public HoodieBloomIndexCheckFunction(HoodieTableMetaClient metaClient, String basePath) { + public HoodieBloomIndexCheckFunction(HoodieTableMetaClient metaClient, HoodieWriteConfig config) { this.metaClient = metaClient; - this.basePath = basePath; + this.config = config; } /** @@ -118,9 +119,10 @@ protected void start() { private void initState(String fileName, String partitionPath) throws HoodieIndexException { try { - Path filePath = new Path(basePath + "/" + partitionPath + "/" + fileName); + Path filePath = new Path(config.getBasePath() + "/" + partitionPath + "/" + fileName); HoodieTimer timer = new HoodieTimer().startTimer(); - bloomFilter = ParquetUtils.readBloomFilterFromParquetMetadata(metaClient.getHadoopConf(), filePath); + bloomFilter = ParquetUtils.readBloomFilterFromParquetMetadata(metaClient.getHadoopConf(), filePath, config + .getEnableDynamicBloomIndex()); logger.info(String.format("Read bloom filter from %s/%s in %d ms", partitionPath, fileName, timer.endTimer())); candidateRecordKeys = new ArrayList<>(); currentFile = fileName; @@ -144,7 +146,7 @@ private void checkAndAddCandidates(String recordKey) { } private List checkAgainstCurrentFile() { - Path filePath = new Path(basePath + "/" + currentPartitionPath + "/" + currentFile); + Path filePath = new Path(config.getBasePath() + "/" + currentPartitionPath + "/" + currentFile); if (logger.isDebugEnabled()) { logger.debug("#The candidate row keys for " + filePath + " => " + candidateRecordKeys); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieStorageWriterFactory.java b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieStorageWriterFactory.java index 13874ca3623ba..cb2e85e68f14f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieStorageWriterFactory.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieStorageWriterFactory.java @@ -41,7 +41,7 @@ public static HoodieSto R extends IndexedRecord> HoodieStorageWriter newParquetStorageWriter(String commitTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable) throws IOException { BloomFilter filter = new BloomFilter(config.getBloomFilterNumEntries(), - config.getBloomFilterFPP()); + config.getBloomFilterFPP(), false); HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport( new AvroSchemaConverter().convert(schema), schema, filter); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java index 2507dcafa30f0..c54842d6f1937 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java @@ -196,7 +196,7 @@ public static String writeParquetFile(String basePath, boolean createCommitTime) throws IOException { if (filter == null) { - filter = new BloomFilter(10000, 0.0000001); + filter = new BloomFilter(10000, 0.0000001, false); } HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java index a52881d2d4d4a..4b298f017da5a 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java @@ -265,7 +265,7 @@ public void testCheckUUIDsAgainstOneFile() throws IOException, InterruptedExcept // We write record1, record2 to a parquet file, but the bloom filter contains (record1, // record2, record3). - BloomFilter filter = new BloomFilter(10000, 0.0000001); + BloomFilter filter = new BloomFilter(10000, 0.0000001, false); filter.add(record3.getRecordKey()); String filename = HoodieClientTestUtils .writeParquetFile(basePath, "2016/01/31", @@ -479,7 +479,7 @@ public void testBloomFilterFalseError() throws IOException, InterruptedException HoodieRecord record2 = new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2); - BloomFilter filter = new BloomFilter(10000, 0.0000001); + BloomFilter filter = new BloomFilter(10000, 0.0000001, false); filter.add(record2.getRecordKey()); String filename = HoodieClientTestUtils .writeParquetFile(basePath, "2016/01/31", diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java index 714f0e5cdb837..2ed63d18b8d77 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java @@ -155,7 +155,8 @@ public void testUpdateRecords() throws Exception { // Read out the bloom filter and make sure filter can answer record exist or not Path parquetFilePath = new Path(parquetFile.getAbsolutePath()); - BloomFilter filter = ParquetUtils.readBloomFilterFromParquetMetadata(jsc.hadoopConfiguration(), parquetFilePath); + BloomFilter filter = ParquetUtils.readBloomFilterFromParquetMetadata(jsc.hadoopConfiguration(), parquetFilePath, + false); for (HoodieRecord record : records) { assertTrue(filter.mightContain(record.getRecordKey())); } @@ -211,7 +212,7 @@ public void testUpdateRecords() throws Exception { // Check whether the record has been updated Path updatedParquetFilePath = new Path(updatedParquetFile.getAbsolutePath()); BloomFilter updatedFilter = ParquetUtils.readBloomFilterFromParquetMetadata(jsc.hadoopConfiguration(), - updatedParquetFilePath); + updatedParquetFilePath, false); for (HoodieRecord record : records) { // No change to the _row_key assertTrue(updatedFilter.mightContain(record.getRecordKey())); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/BloomFilter.java b/hoodie-common/src/main/java/com/uber/hoodie/common/BloomFilter.java index ce2249179aade..439606d050327 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/BloomFilter.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/BloomFilter.java @@ -28,7 +28,8 @@ import org.apache.hadoop.util.hash.Hash; /** - * A Bloom filter implementation built on top of {@link org.apache.hadoop.util.bloom.BloomFilter}. + * A Bloom filter implementation built on top of {@link org.apache.hadoop.util.bloom.BloomFilter} or + * {@link org.apache.hadoop.util.bloom.DynamicBloomFilter}. */ public class BloomFilter { @@ -37,33 +38,49 @@ public class BloomFilter { */ public static final double LOG2_SQUARED = Math.log(2) * Math.log(2); - private org.apache.hadoop.util.bloom.BloomFilter filter = null; + private org.apache.hadoop.util.bloom.BloomFilter bloomFilter = null; - public BloomFilter(int numEntries, double errorRate) { - this(numEntries, errorRate, Hash.MURMUR_HASH); + private org.apache.hadoop.util.bloom.DynamicBloomFilter dynamicBloomFilter = null; + + private boolean isDynamic; + + public BloomFilter(int numEntries, double errorRate, boolean isDynamic) { + this(numEntries, errorRate, Hash.MURMUR_HASH, isDynamic); } /** * Create a new Bloom filter with the given configurations. */ - public BloomFilter(int numEntries, double errorRate, int hashType) { + public BloomFilter(int numEntries, double errorRate, int hashType, boolean isDynamic) { // Bit size int bitSize = (int) Math.ceil(numEntries * (-Math.log(errorRate) / LOG2_SQUARED)); // Number of the hash functions int numHashs = (int) Math.ceil(Math.log(2) * bitSize / numEntries); // The filter - this.filter = new org.apache.hadoop.util.bloom.BloomFilter(bitSize, numHashs, hashType); + if (isDynamic) { + this.dynamicBloomFilter = new org.apache.hadoop.util.bloom.DynamicBloomFilter(bitSize, numHashs, hashType, + numEntries); + } else { + this.bloomFilter = new org.apache.hadoop.util.bloom.BloomFilter(bitSize, numHashs, hashType); + } + this.isDynamic = isDynamic; } /** * Create the bloom filter from serialized string. */ - public BloomFilter(String filterStr) { - this.filter = new org.apache.hadoop.util.bloom.BloomFilter(); + public BloomFilter(String filterStr, boolean isDynamic) { + this.isDynamic = isDynamic; byte[] bytes = DatatypeConverter.parseBase64Binary(filterStr); DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes)); try { - this.filter.readFields(dis); + if (isDynamic) { + this.dynamicBloomFilter = new org.apache.hadoop.util.bloom.DynamicBloomFilter(); + this.dynamicBloomFilter.readFields(dis); + } else { + this.bloomFilter = new org.apache.hadoop.util.bloom.BloomFilter(); + this.bloomFilter.readFields(dis); + } dis.close(); } catch (IOException e) { throw new HoodieIndexException("Could not deserialize BloomFilter instance", e); @@ -74,14 +91,22 @@ public void add(String key) { if (key == null) { throw new NullPointerException("Key cannot by null"); } - filter.add(new Key(key.getBytes(StandardCharsets.UTF_8))); + if (isDynamic) { + this.dynamicBloomFilter.add(new Key(key.getBytes(StandardCharsets.UTF_8))); + } else { + this.bloomFilter.add(new Key(key.getBytes(StandardCharsets.UTF_8))); + } } public boolean mightContain(String key) { if (key == null) { throw new NullPointerException("Key cannot by null"); } - return filter.membershipTest(new Key(key.getBytes(StandardCharsets.UTF_8))); + if (isDynamic) { + return this.dynamicBloomFilter.membershipTest(new Key(key.getBytes(StandardCharsets.UTF_8))); + } else { + return this.bloomFilter.membershipTest(new Key(key.getBytes(StandardCharsets.UTF_8))); + } } /** @@ -91,7 +116,11 @@ public String serializeToString() { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); try { - filter.write(dos); + if (isDynamic) { + dynamicBloomFilter.write(dos); + } else { + bloomFilter.write(dos); + } byte[] bytes = baos.toByteArray(); dos.close(); return DatatypeConverter.printBase64Binary(bytes); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java index a112a6de29f0d..ee6a5dbd0fb07 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java @@ -146,10 +146,10 @@ public static Schema readAvroSchema(Configuration configuration, Path parquetFil * Read out the bloom filter from the parquet file meta data. */ public static BloomFilter readBloomFilterFromParquetMetadata(Configuration configuration, - Path parquetFilePath) { + Path parquetFilePath, boolean enableDynamicBloomFilter) { String footerVal = readParquetFooter(configuration, parquetFilePath, HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY).get(0); - return new BloomFilter(footerVal); + return new BloomFilter(footerVal, enableDynamicBloomFilter); } public static String[] readMinMaxRecordKeys(Configuration configuration, Path parquetFilePath) { diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/AbstractBloomFilter.java b/hoodie-common/src/test/java/com/uber/hoodie/common/AbstractBloomFilter.java new file mode 100644 index 0000000000000..52f1e5ca4593f --- /dev/null +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/AbstractBloomFilter.java @@ -0,0 +1,53 @@ +package com.uber.hoodie.common;/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.UUID; + +public abstract class AbstractBloomFilter { + + protected void testAddKey(boolean isDynamic) { + BloomFilter filter = new BloomFilter(100, 0.0000001, isDynamic); + filter.add("key1"); + assert (filter.mightContain("key1")); + } + + protected int testOverloadBloom(boolean isDynamic) { + BloomFilter filter = new BloomFilter(10, 0.0000001, isDynamic); + for (int i = 0; i < 100; i++) { + filter.add(UUID.randomUUID().toString()); + } + int falsePositives = 0; + for (int i = 0; i < 100; i++) { + if (filter.mightContain(UUID.randomUUID().toString())) { + falsePositives++; + } + } + return falsePositives; + } + + protected void testSerialize(boolean isDynamic) throws IOException, ClassNotFoundException { + BloomFilter filter = new BloomFilter(1000, 0.0000001, isDynamic); + filter.add("key1"); + filter.add("key2"); + String filterStr = filter.serializeToString(); + + // Rebuild + BloomFilter newFilter = new BloomFilter(filterStr, isDynamic); + assert (newFilter.mightContain("key1")); + assert (newFilter.mightContain("key2")); + } +} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/TestBloomFilter.java b/hoodie-common/src/test/java/com/uber/hoodie/common/TestBloomFilter.java index dbc3567ff8d65..50211247c4df5 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/TestBloomFilter.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/TestBloomFilter.java @@ -19,25 +19,21 @@ import java.io.IOException; import org.junit.Test; -public class TestBloomFilter { +public class TestBloomFilter extends AbstractBloomFilter { @Test public void testAddKey() { - BloomFilter filter = new BloomFilter(100, 0.0000001); - filter.add("key1"); - assert (filter.mightContain("key1")); + testAddKey(false); } @Test - public void testSerialize() throws IOException, ClassNotFoundException { - BloomFilter filter = new BloomFilter(1000, 0.0000001); - filter.add("key1"); - filter.add("key2"); - String filterStr = filter.serializeToString(); + public void testOverloadBloom() { + int falsePositives = testOverloadBloom(false); + assert (falsePositives > 0); + } - // Rebuild - BloomFilter newFilter = new BloomFilter(filterStr); - assert (newFilter.mightContain("key1")); - assert (newFilter.mightContain("key2")); + @Test + public void testSerialize() throws IOException, ClassNotFoundException { + testSerialize(false); } } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/TestDynamicBloomFilter.java b/hoodie-common/src/test/java/com/uber/hoodie/common/TestDynamicBloomFilter.java new file mode 100644 index 0000000000000..13d201cb87785 --- /dev/null +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/TestDynamicBloomFilter.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common; + +import java.io.IOException; +import org.junit.Test; + +public class TestDynamicBloomFilter extends AbstractBloomFilter { + + @Test + public void testAddKey() { + testAddKey(true); + } + + @Test + public void testOverloadBloom() { + int falsePositives = testOverloadBloom(true); + assert (falsePositives == 0); + } + + @Test + public void testSerialize() throws IOException, ClassNotFoundException { + testSerialize(true); + } + +} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestParquetUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestParquetUtils.java index 3d5949b6ab2ef..b512b72530cad 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestParquetUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestParquetUtils.java @@ -72,7 +72,7 @@ public void testHoodieWriteSupport() throws Exception { assertEquals("Did not read back the expected list of keys", rowKeys, rowKeysInFile); BloomFilter filterInFile = ParquetUtils.readBloomFilterFromParquetMetadata(HoodieTestUtils.getDefaultHadoopConf(), - new Path(filePath)); + new Path(filePath), false); for (String rowKey : rowKeys) { assertTrue("key should be found in bloom filter", filterInFile.mightContain(rowKey)); } @@ -109,7 +109,7 @@ private void writeParquetFile(String filePath, List rowKeys) throws Exception { // Write out a parquet file Schema schema = HoodieAvroUtils.getRecordKeySchema(); - BloomFilter filter = new BloomFilter(1000, 0.0001); + BloomFilter filter = new BloomFilter(1000, 0.0001, false); HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter); ParquetWriter writer = new ParquetWriter(new Path(filePath), writeSupport, CompressionCodecName.GZIP, diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java index 7093bcf9c7d7c..ac72f1fee1770 100644 --- a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java @@ -276,7 +276,7 @@ private static void generateParquetData(Path filePath, boolean isParquetSchemaSi Schema schema = (isParquetSchemaSimple ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema()); org.apache.parquet.schema.MessageType parquetSchema = new AvroSchemaConverter().convert(schema); - BloomFilter filter = new BloomFilter(1000, 0.0001); + BloomFilter filter = new BloomFilter(1000, 0.0001, false); HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(parquetSchema, schema, filter); ParquetWriter writer = new ParquetWriter(filePath, writeSupport, CompressionCodecName.GZIP, 120 * 1024 * 1024, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE,