From 8e59ed8dad4da1b7f7d423b573eb68f6ca03e65a Mon Sep 17 00:00:00 2001 From: "Chen, Junjie" Date: Wed, 23 May 2018 22:52:10 +0800 Subject: [PATCH 1/5] PARQUET-41: rebase to latest master remove some obsolete changes --- .../apache/parquet/cli/util/Expressions.java | 4 +- .../parquet/column/ParquetProperties.java | 3 + .../parquet/column/values/bloom/Bloom.java | 369 ++++++++++++++++++ .../column/values/bloom/TestBloom.java | 111 ++++++ pom.xml | 2 +- 5 files changed, 486 insertions(+), 3 deletions(-) create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/bloom/Bloom.java create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/bloom/TestBloom.java diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/util/Expressions.java b/parquet-cli/src/main/java/org/apache/parquet/cli/util/Expressions.java index 06b28b46ae..d18ef559f2 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/util/Expressions.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/util/Expressions.java @@ -19,7 +19,7 @@ package org.apache.parquet.cli.util; -import com.google.common.base.Objects; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.avro.Schema; @@ -385,7 +385,7 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("type", type) .add("value", value) .add("children", children) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index 39b65da9fa..3abb5365fc 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -48,6 +48,9 @@ public class ParquetProperties { public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100; public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; + // TODO: need to discuss a maximum value + public static final int DEFAULT_MAXIMUM_BLOOM_FILTER_BYTES = 16 * 1024 * 1024; + public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory(); private static final int MIN_SLAB_SIZE = 64; diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/Bloom.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/Bloom.java new file mode 100644 index 0000000000..c942a556f2 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/Bloom.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.bloom; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.IntBuffer; + +import com.google.common.hash.Hashing; +import com.google.common.hash.HashFunction; +import org.apache.parquet.Preconditions; +import org.apache.parquet.bytes.*; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.io.api.Binary; + +/** + * Bloom Filter is a compact structure to indicate whether an item is not in set or probably + * in set. Bloom class is underlying class of Bloom Filter which stores a bit set represents + * elements set, hash strategy and bloom filter algorithm. + * + * Bloom Filter algorithm is implemented using block Bloom filters from Putze et al.'s "Cache-, + * Hash- and Space-Efficient Bloom Filters". The basic idea is to hash the item to a tiny Bloom + * Filter which size fit a single cache line or smaller. This implementation sets 8 bits in + * each tiny Bloom Filter. Tiny bloom filter are 32 bytes to take advantage of 32-bytes SIMD + * instruction. + */ + +public class Bloom { + // Hash strategy available for bloom filter. + public enum HashStrategy { + MURMUR3_X64_128, + } + + // Bloom filter algorithm. + public enum Algorithm { + BLOCK, + } + + /** + * Default false positive probability value use to calculate optimal number of bits + * used by bloom filter. + */ + public final double DEFAULT_FPP = 0.01; + + // Bloom filter data header, including number of bytes, hash strategy and algorithm. + public static final int HEADER_SIZE = 12; + + // Bytes in a tiny bloom filter block. + public static final int BYTES_PER_FILTER_BLOCK = 32; + + // Default seed for hash function + public static final int DEFAULT_SEED = 104729; + + // Hash strategy used in this bloom filter. + public final HashStrategy hashStrategy; + + // Algorithm applied of this bloom filter. + public final Algorithm algorithm; + + // The underlying byte array for bloom filter bitset. + private byte[] bitset; + + // A integer array buffer of underlying bitset help setting bits. + private IntBuffer intBuffer; + + // Hash function use to compute hash for column value. + private HashFunction hashFunction; + + // The block based algorithm needs 8 odd SALT values to calculate eight index + // of bit to set, one bit in 32-bit word. + private static final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, + 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31}; + + /** + * Constructor of bloom filter, if numBytes is zero, bloom filter bitset + * will be created lazily and the number of bytes will be calculated through + * distinct values in cache. It use murmur3_x64_128 as its default hash function + * and block based algorithm as default algorithm. + * @param numBytes The number of bytes for bloom filter bitset, set to zero can + * let it calculate number automatically by using default DEFAULT_FPP. + */ + public Bloom(int numBytes) { + this(numBytes, HashStrategy.MURMUR3_X64_128, Algorithm.BLOCK); + } + + /** + * Constructor of bloom filter, if numBytes is zero, bloom filter bitset + * will be created lazily and the number of bytes will be calculated through + * distinct values in cache. + * @param numBytes The number of bytes for bloom filter bitset, set to zero can + * let it calculate number automatically by using default DEFAULT_FPP. + * @param hashStrategy The hash strategy bloom filter apply. + * @param algorithm The algorithm of bloom filter. + */ + private Bloom(int numBytes, HashStrategy hashStrategy, Algorithm algorithm) { + initBitset(numBytes); + + switch (hashStrategy) { + case MURMUR3_X64_128: + this.hashStrategy = hashStrategy; + hashFunction = Hashing.murmur3_128(DEFAULT_SEED); + break; + default: + throw new RuntimeException("Not supported hash strategy"); + } + + this.algorithm = algorithm; + } + + + /** + * Construct the bloom filter with given bit set, it is used when reconstruct + * bloom filter from parquet file.It use murmur3_x64_128 as its default hash + * function and block based algorithm as default algorithm. + * @param bitset The given bitset to construct bloom filter. + */ + public Bloom(byte[] bitset) { + this(bitset, HashStrategy.MURMUR3_X64_128, Algorithm.BLOCK); + } + + /** + * Construct the bloom filter with given bit set, it is used + * when reconstruct bloom filter from parquet file. + * @param bitset The given bitset to construct bloom filter. + * @param hashStrategy The hash strategy bloom filter apply. + * @param algorithm The algorithm of bloom filter. + */ + private Bloom(byte[] bitset, HashStrategy hashStrategy, Algorithm algorithm) { + this.bitset = bitset; + this.intBuffer = ByteBuffer.wrap(bitset).asIntBuffer(); + + switch (hashStrategy) { + case MURMUR3_X64_128: + this.hashStrategy = hashStrategy; + hashFunction = Hashing.murmur3_128(DEFAULT_SEED); + break; + default: + throw new RuntimeException("Not supported hash strategy"); + } + this.algorithm = algorithm; + } + + /** + * Create a new bitset for bloom filter, at least 256 bits will be create. + * @param numBytes number of bytes for bit set. + */ + private void initBitset(int numBytes) { + if (numBytes < BYTES_PER_FILTER_BLOCK) { + numBytes = BYTES_PER_FILTER_BLOCK; + } + + // Get next power of 2 if it is not power of 2. + if ((numBytes & (numBytes - 1)) != 0) { + numBytes = Integer.highestOneBit(numBytes) << 1; + } + + if (numBytes > ParquetProperties.DEFAULT_MAXIMUM_BLOOM_FILTER_BYTES || numBytes < 0) { + numBytes = ParquetProperties.DEFAULT_MAXIMUM_BLOOM_FILTER_BYTES; + } + + this.bitset = new byte[numBytes]; + this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.BIG_ENDIAN).asIntBuffer(); + } + + /** + * Write bloom filter to output stream. A bloom filter structure should include + * bitset length, hash strategy, algorithm, and bitset. + * @param out output stream to write + */ + public void writeTo(OutputStream out) throws IOException { + // Write number of bytes of bitset. + out.write(BytesUtils.intToBytes(bitset.length)); + + // Write hash strategy + out.write(BytesUtils.intToBytes(this.hashStrategy.ordinal())); + + // Write algorithm + out.write(BytesUtils.intToBytes(this.algorithm.ordinal())); + + // Write bitset + out.write(bitset); + } + + private int[] setMask(int key) { + int mask[] = new int[8]; + + for (int i = 0; i < 8; ++i) { + mask[i] = key * SALT[i]; + } + + for (int i = 0; i < 8; ++i) { + mask[i] = mask[i] >>> 27; + } + + for (int i = 0; i < 8; ++i) { + mask[i] = 0x1 << mask[i]; + } + + return mask; + } + + /** + * Add an element to bloom filter, the element content is represented by + * the hash value of its plain encoding result. + * @param hash the hash result of element. + */ + private void addElement(long hash) { + int bucketIndex = (int)(hash >> 32) & (bitset.length / BYTES_PER_FILTER_BLOCK - 1); + int key = (int)hash; + + // Calculate mask for bucket. + int mask[] = setMask(key); + + for (int i = 0; i < 8; i++) { + int value = intBuffer.get(bucketIndex * (BYTES_PER_FILTER_BLOCK / 4) + i); + value |= mask[i]; + intBuffer.put(bucketIndex * (BYTES_PER_FILTER_BLOCK / 4) + i, value); + } + } + + /** + * Determine where an element is in set or not. + * @param hash the hash value of element plain encoding result. + * @return false if element is must not in set, true if element probably in set. + */ + private boolean contains(long hash) { + int bucketIndex = (int)(hash >> 32) & (bitset.length / BYTES_PER_FILTER_BLOCK - 1); + int key = (int)hash; + + // Calculate mask for bucket. + int mask[] = setMask(key); + + for (int i = 0; i < 8; i++) { + if (0 == (intBuffer.get(bucketIndex * (BYTES_PER_FILTER_BLOCK / 4) + i) & mask[i])) { + return false; + } + } + + return true; + } + + /** + * Calculate optimal size according to the number of distinct values and false positive probability. + * @param n: The number of distinct values. + * @param p: The false positive probability. + * @return optimal number of bits of given n and p. + */ + public static int optimalNumOfBits(long n, double p) { + Preconditions.checkArgument((p > 0.0 && p < 1.0), + "FPP should be less than 1.0 and great than 0.0"); + + final double M = -8 * n / Math.log(1 - Math.pow(p, 1.0 / 8)); + final double MAX = ParquetProperties.DEFAULT_MAXIMUM_BLOOM_FILTER_BYTES << 3; + int numBits = (int)M; + + // Handle overflow. + if (M > MAX || M < 0) { + numBits = (int)MAX; + } + + // Get next power of 2 if bits is not power of 2. + if ((numBits & (numBits - 1)) != 0) { + numBits = Integer.highestOneBit(numBits) << 1; + } + + // Minimum + if (numBits < (BYTES_PER_FILTER_BLOCK << 3)) { + numBits = BYTES_PER_FILTER_BLOCK << 3; + } + + return numBits; + } + + /** + * used to decide if we want to work to the next page + * @return Bytes buffered of bloom filter. + */ + public long getBufferedSize() { + return bitset.length; + } + + /** + * Compute hash for int value by using its plain encoding result. + * @param value the value to hash + * @return hash result + */ + public long hash(int value) { + ByteBuffer plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE); + plain.order(ByteOrder.LITTLE_ENDIAN).putInt(value); + return hashFunction.hashBytes(plain.array()).asLong(); + } + + /** + * Compute hash for long value by using its plain encoding result. + * @param value the value to hash + * @return hash result + */ + public long hash(long value) { + ByteBuffer plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE); + plain.order(ByteOrder.LITTLE_ENDIAN).putLong(value); + return hashFunction.hashBytes(plain.array()).asLong(); + } + + /** + * Compute hash for double value by using its plain encoding result. + * @param value the value to hash + * @return hash result + */ + public long hash(double value) { + ByteBuffer plain = ByteBuffer.allocate(Double.SIZE/Byte.SIZE); + plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(value); + return hashFunction.hashBytes(plain.array()).asLong(); + } + + /** + * Compute hash for float value by using its plain encoding result. + * @param value the value to hash + * @return hash result + */ + public long hash(float value) { + ByteBuffer plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE); + plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(value); + return hashFunction.hashBytes(plain.array()).asLong(); + } + + /** + * Compute hash for Binary value by using its plain encoding result. + * @param value the value to hash + * @return hash result + */ + public long hash(Binary value) { + return hashFunction.hashBytes(value.toByteBuffer()).asLong(); + } + + /** + * Insert element to set represented by bloom bitset. + * @param hash the hash of value to insert into bloom filter.. + */ + public void insert(long hash) { + addElement(hash); + } + + /** + * Determine whether an element exist in set or not. + * @param hash the element to contain. + * @return false if value is definitely not in set, and true means PROBABLY in set. + */ + public boolean find(long hash) { + return contains(hash); + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/TestBloom.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/TestBloom.java new file mode 100644 index 0000000000..ade4bae3fd --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/TestBloom.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.column.values.bloom; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.parquet.column.values.RandomStr; +import org.apache.parquet.io.api.Binary; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class TestBloom { + @Test + public void testIntBloom () throws IOException { + Bloom bloom = new Bloom(279); + assertEquals("bloom filter size should be adjust to 512 bytes if input bytes is 279 bytes", + bloom.getBufferedSize(), 512); + + for(int i = 0; i<10; i++) { + bloom.insert(bloom.hash(i)); + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream((int)bloom.getBufferedSize() + bloom.HEADER_SIZE); + bloom.writeTo(baos); + + ByteBuffer bloomBuffer = ByteBuffer.wrap(baos.toByteArray()); + + int length = Integer.reverseBytes(bloomBuffer.getInt()); + int hash = Integer.reverseBytes(bloomBuffer.getInt()); + int algorithm = Integer.reverseBytes(bloomBuffer.getInt()); + + byte[] bitset = new byte[length]; + bloomBuffer.get(bitset); + + bloom = new Bloom(bitset); + + for(int i = 0; i < 10; i++) { + assertTrue(bloom.find(bloom.hash(i))); + } + } + + @Test + public void testBinaryBloom() throws IOException { + final long SEED = 104729; + Bloom binaryBloom = new Bloom(Bloom.optimalNumOfBits(100000, 0.01)); + + List strings = new ArrayList<>(); + RandomStr randomStr = new RandomStr(new Random(SEED)); + for(int i = 0; i < 100000; i++) { + String str = randomStr.get(10); + strings.add(str); + binaryBloom.insert(binaryBloom.hash(Binary.fromString(str))); + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream( + (int)binaryBloom.getBufferedSize() + binaryBloom.HEADER_SIZE); + binaryBloom.writeTo(baos); + + ByteBuffer bloomBuffer = ByteBuffer.wrap(baos.toByteArray()); + + int length = Integer.reverseBytes(bloomBuffer.getInt()); + int hash = Integer.reverseBytes(bloomBuffer.getInt()); + int algorithm = Integer.reverseBytes(bloomBuffer.getInt()); + + byte[] bitset = new byte[length]; + bloomBuffer.get(bitset); + + binaryBloom = new Bloom(bitset); + + for(int i = 0; i < strings.size(); i++) { + assertTrue(binaryBloom.find(binaryBloom.hash(Binary.fromString(strings.get(i))))); + } + + // exist can be true at probability 0.01. + int exist = 0; + for (int i = 0; i < 100000; i++) { + String str = randomStr.get(8); + if (binaryBloom.find(binaryBloom.hash(Binary.fromString(str)))) { + exist ++; + } + } + + // exist should be probably less than 1000 according default FPP 0.01. + assertTrue(exist < 1200); + } +} diff --git a/pom.xml b/pom.xml index 7b3f36fe5b..21c399d3a6 100644 --- a/pom.xml +++ b/pom.xml @@ -96,7 +96,7 @@ 0.9.33 1.7.22 1.8.2 - 20.0 + 23.0 0.1.1 1.10.19 From 9a1955dd5c5edae4fedbb43251e6c3e7ce5ad620 Mon Sep 17 00:00:00 2001 From: "Chen, Junjie" Date: Thu, 21 Jun 2018 19:41:38 +0800 Subject: [PATCH 2/5] PARQUET-41: update intBuffer to use little endian for cross compatibility issue --- .../java/org/apache/parquet/column/values/bloom/Bloom.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/Bloom.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/Bloom.java index c942a556f2..264ac16bc4 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/Bloom.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/Bloom.java @@ -145,7 +145,7 @@ public Bloom(byte[] bitset) { */ private Bloom(byte[] bitset, HashStrategy hashStrategy, Algorithm algorithm) { this.bitset = bitset; - this.intBuffer = ByteBuffer.wrap(bitset).asIntBuffer(); + this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer(); switch (hashStrategy) { case MURMUR3_X64_128: @@ -177,7 +177,7 @@ private void initBitset(int numBytes) { } this.bitset = new byte[numBytes]; - this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.BIG_ENDIAN).asIntBuffer(); + this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer(); } /** From ec9cefdea746ae241d7749025763d05cc6e534b4 Mon Sep 17 00:00:00 2001 From: "Chen, Junjie" Date: Thu, 28 Jun 2018 16:34:26 +0800 Subject: [PATCH 3/5] PARQUET-1332: update according to review comments from parquet-cpp --- .../parquet/column/ParquetProperties.java | 8 ++--- .../parquet/column/values/bloom/Bloom.java | 29 ++----------------- .../column/values/bloom/TestBloom.java | 10 ++++--- 3 files changed, 13 insertions(+), 34 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index 3abb5365fc..5543c8d33c 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -48,7 +48,7 @@ public class ParquetProperties { public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100; public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; - // TODO: need to discuss a maximum value + // TODO: need to discuss a maximum value when doing write side task public static final int DEFAULT_MAXIMUM_BLOOM_FILTER_BYTES = 16 * 1024 * 1024; public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory(); diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/Bloom.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/Bloom.java index 264ac16bc4..07e78e57fc 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/Bloom.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/Bloom.java @@ -54,19 +54,13 @@ public enum Algorithm { BLOCK, } - /** - * Default false positive probability value use to calculate optimal number of bits - * used by bloom filter. - */ - public final double DEFAULT_FPP = 0.01; - // Bloom filter data header, including number of bytes, hash strategy and algorithm. public static final int HEADER_SIZE = 12; // Bytes in a tiny bloom filter block. public static final int BYTES_PER_FILTER_BLOCK = 32; - // Default seed for hash function + // Default seed for hash function, it comes from Murmur3 from Hive. public static final int DEFAULT_SEED = 104729; // Hash strategy used in this bloom filter. @@ -222,7 +216,7 @@ private int[] setMask(int key) { * the hash value of its plain encoding result. * @param hash the hash result of element. */ - private void addElement(long hash) { + public void insert(long hash) { int bucketIndex = (int)(hash >> 32) & (bitset.length / BYTES_PER_FILTER_BLOCK - 1); int key = (int)hash; @@ -241,7 +235,7 @@ private void addElement(long hash) { * @param hash the hash value of element plain encoding result. * @return false if element is must not in set, true if element probably in set. */ - private boolean contains(long hash) { + public boolean find(long hash) { int bucketIndex = (int)(hash >> 32) & (bitset.length / BYTES_PER_FILTER_BLOCK - 1); int key = (int)hash; @@ -349,21 +343,4 @@ public long hash(float value) { public long hash(Binary value) { return hashFunction.hashBytes(value.toByteBuffer()).asLong(); } - - /** - * Insert element to set represented by bloom bitset. - * @param hash the hash of value to insert into bloom filter.. - */ - public void insert(long hash) { - addElement(hash); - } - - /** - * Determine whether an element exist in set or not. - * @param hash the element to contain. - * @return false if value is definitely not in set, and true means PROBABLY in set. - */ - public boolean find(long hash) { - return contains(hash); - } } diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/TestBloom.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/TestBloom.java index ade4bae3fd..5e00d6060e 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/TestBloom.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/TestBloom.java @@ -66,12 +66,14 @@ public void testIntBloom () throws IOException { @Test public void testBinaryBloom() throws IOException { + int totalCount = 100000; + double fpp = 0.01; final long SEED = 104729; - Bloom binaryBloom = new Bloom(Bloom.optimalNumOfBits(100000, 0.01)); + Bloom binaryBloom = new Bloom(Bloom.optimalNumOfBits(totalCount, fpp)); List strings = new ArrayList<>(); RandomStr randomStr = new RandomStr(new Random(SEED)); - for(int i = 0; i < 100000; i++) { + for(int i = 0; i < totalCount; i++) { String str = randomStr.get(10); strings.add(str); binaryBloom.insert(binaryBloom.hash(Binary.fromString(str))); @@ -98,7 +100,7 @@ public void testBinaryBloom() throws IOException { // exist can be true at probability 0.01. int exist = 0; - for (int i = 0; i < 100000; i++) { + for (int i = 0; i < totalCount; i++) { String str = randomStr.get(8); if (binaryBloom.find(binaryBloom.hash(Binary.fromString(str)))) { exist ++; @@ -106,6 +108,6 @@ public void testBinaryBloom() throws IOException { } // exist should be probably less than 1000 according default FPP 0.01. - assertTrue(exist < 1200); + assertTrue(exist < totalCount*fpp); } } From 6a4347657478235f66999df4be3516306b936477 Mon Sep 17 00:00:00 2001 From: "Chen, Junjie" Date: Sun, 15 Jul 2018 13:51:35 +0800 Subject: [PATCH 4/5] PARQUET-1342: update according to comments from parquet-cpp --- .../parquet/column/ParquetProperties.java | 9 +- .../bloom/{Bloom.java => BloomFilter.java} | 170 ++++++++++-------- .../{TestBloom.java => TestBloomFilter.java} | 80 +++++---- 3 files changed, 143 insertions(+), 116 deletions(-) rename parquet-column/src/test/java/org/apache/parquet/column/values/bloom/{Bloom.java => BloomFilter.java} (58%) rename parquet-column/src/test/java/org/apache/parquet/column/values/bloom/{TestBloom.java => TestBloomFilter.java} (55%) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index 5543c8d33c..39b65da9fa 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -48,9 +48,6 @@ public class ParquetProperties { public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100; public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; - // TODO: need to discuss a maximum value when doing write side task - public static final int DEFAULT_MAXIMUM_BLOOM_FILTER_BYTES = 16 * 1024 * 1024; - public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory(); private static final int MIN_SLAB_SIZE = 64; diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/Bloom.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/BloomFilter.java similarity index 58% rename from parquet-column/src/test/java/org/apache/parquet/column/values/bloom/Bloom.java rename to parquet-column/src/test/java/org/apache/parquet/column/values/bloom/BloomFilter.java index 07e78e57fc..e54c7ae32b 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/Bloom.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/BloomFilter.java @@ -28,23 +28,22 @@ import com.google.common.hash.HashFunction; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.*; -import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.io.api.Binary; /** - * Bloom Filter is a compact structure to indicate whether an item is not in set or probably - * in set. Bloom class is underlying class of Bloom Filter which stores a bit set represents - * elements set, hash strategy and bloom filter algorithm. + * A Bloom filter is a compact structure to indicate whether an item is not in a set or probably + * in a set. BloomFilter class stores a bit set represents a elements set, a hash strategy and a + * Bloom filter algorithm. * - * Bloom Filter algorithm is implemented using block Bloom filters from Putze et al.'s "Cache-, - * Hash- and Space-Efficient Bloom Filters". The basic idea is to hash the item to a tiny Bloom - * Filter which size fit a single cache line or smaller. This implementation sets 8 bits in - * each tiny Bloom Filter. Tiny bloom filter are 32 bytes to take advantage of 32-bytes SIMD + * This Bloom filter is implemented using block-based Bloom filter algorithm from Putze et al.'s + * "Cache-, Hash- and Space-Efficient Bloom filters". The basic idea is to hash the item to a tiny + * Bloom filter which size fit a single cache line or smaller. This implementation sets 8 bits in + * each tiny Bloom filter. Each tiny Bloom filter is 32 bytes to take advantage of 32-byte SIMD * instruction. */ -public class Bloom { - // Hash strategy available for bloom filter. +public class BloomFilter { + // Bloom filter Hash strategy . public enum HashStrategy { MURMUR3_X64_128, } @@ -54,57 +53,67 @@ public enum Algorithm { BLOCK, } - // Bloom filter data header, including number of bytes, hash strategy and algorithm. + // The Bloom filter header includes the number of bytes, hash strategy and algorithm. public static final int HEADER_SIZE = 12; - // Bytes in a tiny bloom filter block. + // Bytes in a tiny Bloom filter block. public static final int BYTES_PER_FILTER_BLOCK = 32; // Default seed for hash function, it comes from Murmur3 from Hive. public static final int DEFAULT_SEED = 104729; - // Hash strategy used in this bloom filter. + // Minimum Bloom filter size, it sets to x86_64 cache alignment. + public static final int MINIMUM_BLOOM_FILTER_BYTES = 64; + + // The number of bits to set in a tiny Bloom filter + public static final int BITS_SET_PER_BLOCK = 8; + + // Maximum Bloom filter size, it sets to default HDFS block size for upper boundary check + // This should be re-consider when implementing write side logic. + public static final int MAXIMUM_BLOOM_FILTER_BYTES = 128 * 1024 * 1024; + + // Hash strategy used in this Bloom filter. public final HashStrategy hashStrategy; - // Algorithm applied of this bloom filter. + // Algorithm used in this Bloom filter. public final Algorithm algorithm; - // The underlying byte array for bloom filter bitset. + // The underlying byte array for Bloom filter bitset. private byte[] bitset; - // A integer array buffer of underlying bitset help setting bits. + // A integer array buffer of underlying bitset to help setting bits. private IntBuffer intBuffer; // Hash function use to compute hash for column value. private HashFunction hashFunction; - // The block based algorithm needs 8 odd SALT values to calculate eight index + // The block-based algorithm needs 8 odd SALT values to calculate eight index // of bit to set, one bit in 32-bit word. private static final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31}; /** - * Constructor of bloom filter, if numBytes is zero, bloom filter bitset - * will be created lazily and the number of bytes will be calculated through - * distinct values in cache. It use murmur3_x64_128 as its default hash function - * and block based algorithm as default algorithm. - * @param numBytes The number of bytes for bloom filter bitset, set to zero can - * let it calculate number automatically by using default DEFAULT_FPP. + * Constructor of Bloom filter. + * + * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within + * [MINIMUM_BLOOM_FILTER_BYTES, MAXIMUM_BLOOM_FILTER_BYTES], it will be rounded up/down + * to lower/upper bound if num_bytes is out of range and also will rounded up to a power + * of 2. It uses murmur3_x64_128 as its default hash function and block-based algorithm + * as default algorithm. */ - public Bloom(int numBytes) { + public BloomFilter(int numBytes) { this(numBytes, HashStrategy.MURMUR3_X64_128, Algorithm.BLOCK); } /** - * Constructor of bloom filter, if numBytes is zero, bloom filter bitset - * will be created lazily and the number of bytes will be calculated through - * distinct values in cache. - * @param numBytes The number of bytes for bloom filter bitset, set to zero can - * let it calculate number automatically by using default DEFAULT_FPP. - * @param hashStrategy The hash strategy bloom filter apply. - * @param algorithm The algorithm of bloom filter. + * Constructor of Bloom filter. It uses murmur3_x64_128 as its default hash + * function and block-based algorithm as its default algorithm. + * + * @param numBytes The number of bytes for Bloom filter bitset + * @param hashStrategy The hash strategy of Bloom filter. + * @param algorithm The algorithm of Bloom filter. */ - private Bloom(int numBytes, HashStrategy hashStrategy, Algorithm algorithm) { + private BloomFilter(int numBytes, HashStrategy hashStrategy, Algorithm algorithm) { initBitset(numBytes); switch (hashStrategy) { @@ -121,23 +130,28 @@ private Bloom(int numBytes, HashStrategy hashStrategy, Algorithm algorithm) { /** - * Construct the bloom filter with given bit set, it is used when reconstruct - * bloom filter from parquet file.It use murmur3_x64_128 as its default hash - * function and block based algorithm as default algorithm. - * @param bitset The given bitset to construct bloom filter. + * Construct the Bloom filter with given bitset, it is used when reconstructing + * Bloom filter from parquet file. It use murmur3_x64_128 as its default hash + * function and block-based algorithm as default algorithm. + * + * @param bitset The given bitset to construct Bloom filter. */ - public Bloom(byte[] bitset) { + public BloomFilter(byte[] bitset) { this(bitset, HashStrategy.MURMUR3_X64_128, Algorithm.BLOCK); } /** - * Construct the bloom filter with given bit set, it is used - * when reconstruct bloom filter from parquet file. - * @param bitset The given bitset to construct bloom filter. - * @param hashStrategy The hash strategy bloom filter apply. - * @param algorithm The algorithm of bloom filter. + * Construct the Bloom filter with given bitset, it is used when reconstructing + * Bloom filter from parquet file. + * + * @param bitset The given bitset to construct Bloom filter. + * @param hashStrategy The hash strategy Bloom filter apply. + * @param algorithm The algorithm of Bloom filter. */ - private Bloom(byte[] bitset, HashStrategy hashStrategy, Algorithm algorithm) { + private BloomFilter(byte[] bitset, HashStrategy hashStrategy, Algorithm algorithm) { + if (bitset == null) { + throw new RuntimeException("Given bitset is null"); + } this.bitset = bitset; this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer(); @@ -153,12 +167,17 @@ private Bloom(byte[] bitset, HashStrategy hashStrategy, Algorithm algorithm) { } /** - * Create a new bitset for bloom filter, at least 256 bits will be create. - * @param numBytes number of bytes for bit set. + * Create a new bitset for Bloom filter. + * + * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within + * [MINIMUM_BLOOM_FILTER_BYTES, MAXIMUM_BLOOM_FILTER_BYTES], it will be rounded up/down + * to lower/upper bound if num_bytes is out of range and also will rounded up to a power + * of 2. It uses murmur3_x64_128 as its default hash function and block-based algorithm + * as default algorithm. */ private void initBitset(int numBytes) { - if (numBytes < BYTES_PER_FILTER_BLOCK) { - numBytes = BYTES_PER_FILTER_BLOCK; + if (numBytes < MINIMUM_BLOOM_FILTER_BYTES) { + numBytes = MINIMUM_BLOOM_FILTER_BYTES; } // Get next power of 2 if it is not power of 2. @@ -166,8 +185,8 @@ private void initBitset(int numBytes) { numBytes = Integer.highestOneBit(numBytes) << 1; } - if (numBytes > ParquetProperties.DEFAULT_MAXIMUM_BLOOM_FILTER_BYTES || numBytes < 0) { - numBytes = ParquetProperties.DEFAULT_MAXIMUM_BLOOM_FILTER_BYTES; + if (numBytes > MAXIMUM_BLOOM_FILTER_BYTES || numBytes < 0) { + numBytes = MAXIMUM_BLOOM_FILTER_BYTES; } this.bitset = new byte[numBytes]; @@ -175,9 +194,10 @@ private void initBitset(int numBytes) { } /** - * Write bloom filter to output stream. A bloom filter structure should include - * bitset length, hash strategy, algorithm, and bitset. - * @param out output stream to write + * Write the Bloom filter to an output stream. It writes the Bloom filter header includes the + * bitset's length in size of byte, the hash strategy, the algorithm, and the bitset. + * + * @param out the output stream to write */ public void writeTo(OutputStream out) throws IOException { // Write number of bytes of bitset. @@ -194,17 +214,17 @@ public void writeTo(OutputStream out) throws IOException { } private int[] setMask(int key) { - int mask[] = new int[8]; + int mask[] = new int[BITS_SET_PER_BLOCK]; - for (int i = 0; i < 8; ++i) { + for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) { mask[i] = key * SALT[i]; } - for (int i = 0; i < 8; ++i) { + for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) { mask[i] = mask[i] >>> 27; } - for (int i = 0; i < 8; ++i) { + for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) { mask[i] = 0x1 << mask[i]; } @@ -212,8 +232,9 @@ private int[] setMask(int key) { } /** - * Add an element to bloom filter, the element content is represented by + * Add an element to Bloom filter, the element content is represented by * the hash value of its plain encoding result. + * * @param hash the hash result of element. */ public void insert(long hash) { @@ -223,7 +244,7 @@ public void insert(long hash) { // Calculate mask for bucket. int mask[] = setMask(key); - for (int i = 0; i < 8; i++) { + for (int i = 0; i < BITS_SET_PER_BLOCK; i++) { int value = intBuffer.get(bucketIndex * (BYTES_PER_FILTER_BLOCK / 4) + i); value |= mask[i]; intBuffer.put(bucketIndex * (BYTES_PER_FILTER_BLOCK / 4) + i, value); @@ -231,7 +252,8 @@ public void insert(long hash) { } /** - * Determine where an element is in set or not. + * Determine whether an element is in set or not. + * * @param hash the hash value of element plain encoding result. * @return false if element is must not in set, true if element probably in set. */ @@ -239,10 +261,10 @@ public boolean find(long hash) { int bucketIndex = (int)(hash >> 32) & (bitset.length / BYTES_PER_FILTER_BLOCK - 1); int key = (int)hash; - // Calculate mask for bucket. + // Calculate mask for the tiny Bloom filter. int mask[] = setMask(key); - for (int i = 0; i < 8; i++) { + for (int i = 0; i < BITS_SET_PER_BLOCK; i++) { if (0 == (intBuffer.get(bucketIndex * (BYTES_PER_FILTER_BLOCK / 4) + i) & mask[i])) { return false; } @@ -253,6 +275,7 @@ public boolean find(long hash) { /** * Calculate optimal size according to the number of distinct values and false positive probability. + * * @param n: The number of distinct values. * @param p: The false positive probability. * @return optimal number of bits of given n and p. @@ -261,12 +284,12 @@ public static int optimalNumOfBits(long n, double p) { Preconditions.checkArgument((p > 0.0 && p < 1.0), "FPP should be less than 1.0 and great than 0.0"); - final double M = -8 * n / Math.log(1 - Math.pow(p, 1.0 / 8)); - final double MAX = ParquetProperties.DEFAULT_MAXIMUM_BLOOM_FILTER_BYTES << 3; - int numBits = (int)M; + final double m = -8 * n / Math.log(1 - Math.pow(p, 1.0 / 8)); + final double MAX = MAXIMUM_BLOOM_FILTER_BYTES << 3; + int numBits = (int)m; // Handle overflow. - if (M > MAX || M < 0) { + if (m > MAX || m < 0) { numBits = (int)MAX; } @@ -275,24 +298,25 @@ public static int optimalNumOfBits(long n, double p) { numBits = Integer.highestOneBit(numBits) << 1; } - // Minimum - if (numBits < (BYTES_PER_FILTER_BLOCK << 3)) { - numBits = BYTES_PER_FILTER_BLOCK << 3; + if (numBits < (MINIMUM_BLOOM_FILTER_BYTES << 3)) { + numBits = MINIMUM_BLOOM_FILTER_BYTES << 3; } return numBits; } /** - * used to decide if we want to work to the next page - * @return Bytes buffered of bloom filter. + * Get the number of bytes for bitset in this Bloom filter. + * + * @return The number of bytes for bitset in this Bloom filter. */ - public long getBufferedSize() { + public long getBitsetSize() { return bitset.length; } /** * Compute hash for int value by using its plain encoding result. + * * @param value the value to hash * @return hash result */ @@ -304,6 +328,7 @@ public long hash(int value) { /** * Compute hash for long value by using its plain encoding result. + * * @param value the value to hash * @return hash result */ @@ -315,6 +340,7 @@ public long hash(long value) { /** * Compute hash for double value by using its plain encoding result. + * * @param value the value to hash * @return hash result */ @@ -326,6 +352,7 @@ public long hash(double value) { /** * Compute hash for float value by using its plain encoding result. + * * @param value the value to hash * @return hash result */ @@ -337,6 +364,7 @@ public long hash(float value) { /** * Compute hash for Binary value by using its plain encoding result. + * * @param value the value to hash * @return hash result */ diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/TestBloom.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/TestBloomFilter.java similarity index 55% rename from parquet-column/src/test/java/org/apache/parquet/column/values/bloom/TestBloom.java rename to parquet-column/src/test/java/org/apache/parquet/column/values/bloom/TestBloomFilter.java index 5e00d6060e..965c32c6ac 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/TestBloom.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/TestBloomFilter.java @@ -34,80 +34,82 @@ import static org.junit.Assert.assertTrue; -public class TestBloom { +public class TestBloomFilter { + + @Test + public void testConstructor () throws IOException { + BloomFilter bloomFilter1 = new BloomFilter(0); + assertEquals(bloomFilter1.getBitsetSize(), BloomFilter.MINIMUM_BLOOM_FILTER_BYTES); + + BloomFilter bloomFilter2 = new BloomFilter(256 * 1024 * 1024); + assertEquals(bloomFilter2.getBitsetSize(), BloomFilter.MAXIMUM_BLOOM_FILTER_BYTES); + + BloomFilter bloomFilter3 = new BloomFilter(1000); + assertEquals(bloomFilter3.getBitsetSize(), 1024); + } + + /* + * This test is used to test basic operations including inserting, finding and + * serializing and de-serializing. + */ @Test - public void testIntBloom () throws IOException { - Bloom bloom = new Bloom(279); - assertEquals("bloom filter size should be adjust to 512 bytes if input bytes is 279 bytes", - bloom.getBufferedSize(), 512); + public void testBasic () throws IOException { + BloomFilter bloomFilter = new BloomFilter(512); - for(int i = 0; i<10; i++) { - bloom.insert(bloom.hash(i)); + for(int i = 0; i < 10; i++) { + bloomFilter.insert(bloomFilter.hash(i)); } - ByteArrayOutputStream baos = new ByteArrayOutputStream((int)bloom.getBufferedSize() + bloom.HEADER_SIZE); - bloom.writeTo(baos); + ByteArrayOutputStream baos = new ByteArrayOutputStream((int) bloomFilter.getBitsetSize() + + BloomFilter.HEADER_SIZE); + bloomFilter.writeTo(baos); ByteBuffer bloomBuffer = ByteBuffer.wrap(baos.toByteArray()); int length = Integer.reverseBytes(bloomBuffer.getInt()); + assertEquals(length, 512); + int hash = Integer.reverseBytes(bloomBuffer.getInt()); + assertEquals(hash, BloomFilter.HashStrategy.MURMUR3_X64_128.ordinal()); + int algorithm = Integer.reverseBytes(bloomBuffer.getInt()); + assertEquals(algorithm, BloomFilter.Algorithm.BLOCK.ordinal()); byte[] bitset = new byte[length]; bloomBuffer.get(bitset); - bloom = new Bloom(bitset); + bloomFilter = new BloomFilter(bitset); for(int i = 0; i < 10; i++) { - assertTrue(bloom.find(bloom.hash(i))); + assertTrue(bloomFilter.find(bloomFilter.hash(i))); } } @Test - public void testBinaryBloom() throws IOException { - int totalCount = 100000; - double fpp = 0.01; + public void testFPP() throws IOException { + final int totalCount = 100000; + final double FPP = 0.01; final long SEED = 104729; - Bloom binaryBloom = new Bloom(Bloom.optimalNumOfBits(totalCount, fpp)); + BloomFilter bloomFilter = new BloomFilter(BloomFilter.optimalNumOfBits(totalCount, FPP)); List strings = new ArrayList<>(); RandomStr randomStr = new RandomStr(new Random(SEED)); for(int i = 0; i < totalCount; i++) { String str = randomStr.get(10); strings.add(str); - binaryBloom.insert(binaryBloom.hash(Binary.fromString(str))); - } - - ByteArrayOutputStream baos = new ByteArrayOutputStream( - (int)binaryBloom.getBufferedSize() + binaryBloom.HEADER_SIZE); - binaryBloom.writeTo(baos); - - ByteBuffer bloomBuffer = ByteBuffer.wrap(baos.toByteArray()); - - int length = Integer.reverseBytes(bloomBuffer.getInt()); - int hash = Integer.reverseBytes(bloomBuffer.getInt()); - int algorithm = Integer.reverseBytes(bloomBuffer.getInt()); - - byte[] bitset = new byte[length]; - bloomBuffer.get(bitset); - - binaryBloom = new Bloom(bitset); - - for(int i = 0; i < strings.size(); i++) { - assertTrue(binaryBloom.find(binaryBloom.hash(Binary.fromString(strings.get(i))))); + bloomFilter.insert(bloomFilter.hash(Binary.fromString(str))); } - // exist can be true at probability 0.01. + // The exist is a counter which is increased by one when find return true. int exist = 0; for (int i = 0; i < totalCount; i++) { String str = randomStr.get(8); - if (binaryBloom.find(binaryBloom.hash(Binary.fromString(str)))) { + if (bloomFilter.find(bloomFilter.hash(Binary.fromString(str)))) { exist ++; } } - // exist should be probably less than 1000 according default FPP 0.01. - assertTrue(exist < totalCount*fpp); + // The exist should be probably less than 1000 according FPP 0.01. + assertTrue(exist < totalCount * FPP); } } From 53f22e0807b0e74bb0410de21a259bcca89b0ccd Mon Sep 17 00:00:00 2001 From: "Chen, Junjie" Date: Tue, 17 Jul 2018 21:45:27 +0800 Subject: [PATCH 5/5] PARQUET-1342: update murmur3 seed value --- .../{bloom => bloomfilter}/BloomFilter.java | 25 ++++------ .../TestBloomFilter.java | 50 ++++++++++++------- 2 files changed, 42 insertions(+), 33 deletions(-) rename parquet-column/src/test/java/org/apache/parquet/column/values/{bloom => bloomfilter}/BloomFilter.java (95%) rename parquet-column/src/test/java/org/apache/parquet/column/values/{bloom => bloomfilter}/TestBloomFilter.java (66%) diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/BloomFilter.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java similarity index 95% rename from parquet-column/src/test/java/org/apache/parquet/column/values/bloom/BloomFilter.java rename to parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java index e54c7ae32b..8d9c1d9fbd 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/BloomFilter.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.parquet.column.values.bloom; +package org.apache.parquet.column.values.bloomfilter; import java.io.IOException; import java.io.OutputStream; @@ -24,10 +24,10 @@ import java.nio.ByteOrder; import java.nio.IntBuffer; -import com.google.common.hash.Hashing; import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; import org.apache.parquet.Preconditions; -import org.apache.parquet.bytes.*; +import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.io.api.Binary; /** @@ -53,25 +53,22 @@ public enum Algorithm { BLOCK, } - // The Bloom filter header includes the number of bytes, hash strategy and algorithm. - public static final int HEADER_SIZE = 12; - // Bytes in a tiny Bloom filter block. - public static final int BYTES_PER_FILTER_BLOCK = 32; + private static final int BYTES_PER_FILTER_BLOCK = 32; - // Default seed for hash function, it comes from Murmur3 from Hive. - public static final int DEFAULT_SEED = 104729; + // Default seed for hash function, it comes from System.nanoTime(). + private static final int DEFAULT_SEED = 1361930890; - // Minimum Bloom filter size, it sets to x86_64 cache alignment. - public static final int MINIMUM_BLOOM_FILTER_BYTES = 64; - - // The number of bits to set in a tiny Bloom filter - public static final int BITS_SET_PER_BLOCK = 8; + // Minimum Bloom filter size, set to size of a tiny Bloom filter block + public static final int MINIMUM_BLOOM_FILTER_BYTES = 32; // Maximum Bloom filter size, it sets to default HDFS block size for upper boundary check // This should be re-consider when implementing write side logic. public static final int MAXIMUM_BLOOM_FILTER_BYTES = 128 * 1024 * 1024; + // The number of bits to set in a tiny Bloom filter + private static final int BITS_SET_PER_BLOCK = 8; + // Hash strategy used in this Bloom filter. public final HashStrategy hashStrategy; diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/TestBloomFilter.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBloomFilter.java similarity index 66% rename from parquet-column/src/test/java/org/apache/parquet/column/values/bloom/TestBloomFilter.java rename to parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBloomFilter.java index 965c32c6ac..ab4d89bdd5 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/bloom/TestBloomFilter.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBloomFilter.java @@ -17,23 +17,27 @@ * under the License. */ -package org.apache.parquet.column.values.bloom; +package org.apache.parquet.column.values.bloomfilter; -import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.ArrayList; import java.util.List; import java.util.Random; import org.apache.parquet.column.values.RandomStr; import org.apache.parquet.io.api.Binary; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; - public class TestBloomFilter { @Test @@ -48,40 +52,48 @@ public void testConstructor () throws IOException { assertEquals(bloomFilter3.getBitsetSize(), 1024); } + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); /* * This test is used to test basic operations including inserting, finding and * serializing and de-serializing. */ @Test public void testBasic () throws IOException { - BloomFilter bloomFilter = new BloomFilter(512); + final String testStrings[] = {"hello", "parquet", "bloom", "filter"}; + BloomFilter bloomFilter = new BloomFilter(1024); - for(int i = 0; i < 10; i++) { - bloomFilter.insert(bloomFilter.hash(i)); + for(int i = 0; i < testStrings.length; i++) { + bloomFilter.insert(bloomFilter.hash(Binary.fromString(testStrings[i]))); } - ByteArrayOutputStream baos = new ByteArrayOutputStream((int) bloomFilter.getBitsetSize() + - BloomFilter.HEADER_SIZE); - bloomFilter.writeTo(baos); + File testFile = temp.newFile(); + FileOutputStream fileOutputStream = new FileOutputStream(testFile); + bloomFilter.writeTo(fileOutputStream); + fileOutputStream.close(); + + FileInputStream fileInputStream = new FileInputStream(testFile); - ByteBuffer bloomBuffer = ByteBuffer.wrap(baos.toByteArray()); + byte[] value = new byte[4]; - int length = Integer.reverseBytes(bloomBuffer.getInt()); - assertEquals(length, 512); + fileInputStream.read(value); + int length = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt(); + assertEquals(length, 1024); - int hash = Integer.reverseBytes(bloomBuffer.getInt()); + fileInputStream.read(value); + int hash = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt(); assertEquals(hash, BloomFilter.HashStrategy.MURMUR3_X64_128.ordinal()); - int algorithm = Integer.reverseBytes(bloomBuffer.getInt()); + fileInputStream.read(value); + int algorithm = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt(); assertEquals(algorithm, BloomFilter.Algorithm.BLOCK.ordinal()); byte[] bitset = new byte[length]; - bloomBuffer.get(bitset); - + fileInputStream.read(bitset); bloomFilter = new BloomFilter(bitset); - for(int i = 0; i < 10; i++) { - assertTrue(bloomFilter.find(bloomFilter.hash(i))); + for(int i = 0; i < testStrings.length; i++) { + assertTrue(bloomFilter.find(bloomFilter.hash(Binary.fromString(testStrings[i])))); } } @@ -100,7 +112,7 @@ public void testFPP() throws IOException { bloomFilter.insert(bloomFilter.hash(Binary.fromString(str))); } - // The exist is a counter which is increased by one when find return true. + // The exist counts the number of times FindHash returns true. int exist = 0; for (int i = 0; i < totalCount; i++) { String str = randomStr.get(8);