diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml index 94285088ab..bce3cf149d 100644 --- a/parquet-column/pom.xml +++ b/parquet-column/pom.xml @@ -58,7 +58,11 @@ fastutil ${fastutil.version} - + + net.openhft + zero-allocation-hashing + 0.9 + com.carrotsearch junit-benchmarks diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index b44791241c..519c0f3cee 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -29,6 +29,7 @@ import org.apache.parquet.column.page.PageWriteStore; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter; +import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; import org.apache.parquet.column.values.factory.DefaultValuesWriterFactory; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; @@ -36,7 +37,9 @@ import org.apache.parquet.schema.MessageType; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; /** * This class represents all the configurable Parquet properties. @@ -52,6 +55,7 @@ public class ParquetProperties { public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64; public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000; + public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024; public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true; @@ -94,13 +98,16 @@ public static WriterVersion fromString(String name) { // The key-value pair represents the column name and its expected distinct number of values in a row group. private final Map bloomFilterExpectedDistinctNumbers; + private final int maxBloomFilterBytes; + private final Set bloomFilterColumns; private final int pageRowCountLimit; private final boolean pageWriteChecksumEnabled; private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck, int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator, ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit, - boolean pageWriteChecksumEnabled, Map bloomFilterExpectedDistinctNumber) { + boolean pageWriteChecksumEnabled, Map bloomFilterExpectedDistinctNumber, + Set bloomFilterColumns, int maxBloomFilterBytes) { this.pageSizeThreshold = pageSize; this.initialSlabSize = CapacityByteArrayOutputStream .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10); @@ -115,6 +122,8 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag this.valuesWriterFactory = writerFactory; this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength; this.bloomFilterExpectedDistinctNumbers = bloomFilterExpectedDistinctNumber; + this.bloomFilterColumns = bloomFilterColumns; + this.maxBloomFilterBytes = maxBloomFilterBytes; this.pageRowCountLimit = pageRowCountLimit; this.pageWriteChecksumEnabled = pageWriteChecksumEnabled; } @@ -178,12 +187,13 @@ public ByteBufferAllocator getAllocator() { } public ColumnWriteStore newColumnWriteStore(MessageType schema, - PageWriteStore pageStore) { + PageWriteStore pageStore, + BloomFilterWriteStore bloomFilterWriteStore) { switch (writerVersion) { case PARQUET_1_0: - return new ColumnWriteStoreV1(schema, pageStore, this); + return new ColumnWriteStoreV1(schema, pageStore, bloomFilterWriteStore, this); case PARQUET_2_0: - return new ColumnWriteStoreV2(schema, pageStore, this); + return new ColumnWriteStoreV2(schema, pageStore, bloomFilterWriteStore, this); default: throw new IllegalArgumentException("unknown version " + writerVersion); } @@ -221,6 +231,14 @@ public Map getBloomFilterColumnExpectedNDVs() { return bloomFilterExpectedDistinctNumbers; } + public Set getBloomFilterColumns() { + return bloomFilterColumns; + } + + public int getMaxBloomFilterBytes() { + return maxBloomFilterBytes; + } + public static Builder builder() { return new Builder(); } @@ -241,6 +259,8 @@ public static class Builder { private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY; private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH; private Map bloomFilterColumnExpectedNDVs = new HashMap<>(); + private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES; + private Set bloomFilterColumns = new HashSet<>(); private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT; private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED; @@ -260,6 +280,8 @@ private Builder(ParquetProperties toCopy) { this.pageRowCountLimit = toCopy.pageRowCountLimit; this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled; this.bloomFilterColumnExpectedNDVs = toCopy.bloomFilterExpectedDistinctNumbers; + this.bloomFilterColumns = toCopy.bloomFilterColumns; + this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes; } /** @@ -349,12 +371,34 @@ public Builder withColumnIndexTruncateLength(int length) { } /** - * Set Bloom filter info for columns. + * Set max Bloom filter bytes for related columns. + * + * @param maxBloomFilterBytes the max bytes of a Bloom filter bitset for a column. + * @return this builder for method chaining + */ + public Builder withMaxBloomFilterBytes(int maxBloomFilterBytes) { + this.maxBloomFilterBytes = maxBloomFilterBytes; + return this; + } + + /** + * Set Bloom filter column names. + * + * @param columns the columns which has bloom filter enabled. + * @return this builder for method chaining + */ + public Builder withBloomFilterColumnNames(Set columns) { + this.bloomFilterColumns = columns; + return this; + } + + /** + * Set expected columns distinct number for Bloom filter. * * @param columnExpectedNDVs the columns expected number of distinct values in a row group * @return this builder for method chaining */ - public Builder withBloomFilterInfo(Map columnExpectedNDVs) { + public Builder withBloomFilterColumnNdvs(Map columnExpectedNDVs) { this.bloomFilterColumnExpectedNDVs = columnExpectedNDVs; return this; } @@ -375,7 +419,8 @@ public ParquetProperties build() { new ParquetProperties(writerVersion, pageSize, dictPageSize, enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck, estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength, - pageRowCountLimit, pageWriteChecksumEnabled, bloomFilterColumnExpectedNDVs); + pageRowCountLimit, pageWriteChecksumEnabled, bloomFilterColumnExpectedNDVs, + bloomFilterColumns, maxBloomFilterBytes); // we pass a constructed but uninitialized factory to ParquetProperties above as currently // creation of ValuesWriters is invoked from within ParquetProperties. In the future // we'd like to decouple that and won't need to pass an object to properties and then pass the diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java index 953b63e9b9..590c3edcf2 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java @@ -22,6 +22,7 @@ import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.PageWriteStore; import org.apache.parquet.column.page.PageWriter; +import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; import org.apache.parquet.schema.MessageType; @@ -31,6 +32,12 @@ public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore, Par super(schema, pageWriteStore, props); } + public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore, + BloomFilterWriteStore bloomFilterWriteStore, + ParquetProperties props) { + super(schema, pageWriteStore, bloomFilterWriteStore, props); + } + @Override ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, BloomFilterWriter bloomFilterWriter, ParquetProperties props) { diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java index 73f613874b..b3b799af32 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Map; +import java.util.Set; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ColumnWriter; @@ -85,14 +86,27 @@ abstract class ColumnWriterBase implements ColumnWriter { if (path.getPath().length != 1 || bloomFilterWriter == null) { return; } + String column = path.getPath()[0]; this.bloomFilterWriter = bloomFilterWriter; + Set bloomFilterColumns = props.getBloomFilterColumns(); + if (!bloomFilterColumns.contains(column)) { + return; + } + int maxBloomFilterSize = props.getMaxBloomFilterBytes(); + Map bloomFilterColumnExpectedNDVs = props.getBloomFilterColumnExpectedNDVs(); - String column = path.getPath()[0]; - if (bloomFilterColumnExpectedNDVs.keySet().contains(column)) { - int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits(bloomFilterColumnExpectedNDVs.get(column).intValue(), - BlockSplitBloomFilter.DEFAULT_FPP); - this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits/8); + if (bloomFilterColumnExpectedNDVs.size() > 0) { + // If user specify the column NDV, we construct Bloom filter from it. + if (bloomFilterColumnExpectedNDVs.keySet().contains(column)) { + int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits(bloomFilterColumnExpectedNDVs.get(column).intValue(), + BlockSplitBloomFilter.DEFAULT_FPP); + + this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8, maxBloomFilterSize); + } + } + else { + this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize); } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java index 0aac63ee9f..aa68cc160f 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java index d8ac0b435a..cc9f674280 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java @@ -19,8 +19,6 @@ package org.apache.parquet.column.values.bloomfilter; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.io.api.Binary; @@ -42,28 +40,27 @@ public class BlockSplitBloomFilter implements BloomFilter { // Bytes in a tiny Bloom filter block. private static final int BYTES_PER_BLOCK = 32; - // Default seed for the hash function. It comes from System.nanoTime(). - private static final int DEFAULT_SEED = 1361930890; + // Bits in a tiny Bloom filter block. + private static final int BITS_PER_BLOCK = 256; - // Minimum Bloom filter size, set to the size of a tiny Bloom filter block - public static final int MINIMUM_BYTES = 32; + // Default minimum Bloom filter size, set to the size of a tiny Bloom filter block + public static final int DEFAULT_MINIMUM_BYTES = 32; - // Maximum Bloom filter size, set to the default HDFS block size for upper boundary check - // This should be re-consider when implementing write side logic. - public static final int MAXIMUM_BYTES = 128 * 1024 * 1024; + // Default Maximum Bloom filter size, set to 1MB which should cover most cases. + public static final int DEFAULT_MAXIMUM_BYTES = 1024 * 1024; // The number of bits to set in a tiny Bloom filter private static final int BITS_SET_PER_BLOCK = 8; - // The metadata in the header of a serialized Bloom filter is three four-byte values: the number of bytes, - // the filter algorithm, and the hash algorithm. - public static final int HEADER_SIZE = 12; + // The metadata in the header of a serialized Bloom filter is four four-byte values: the number of bytes, + // the filter algorithm, the hash algorithm, and the compression. + public static final int HEADER_SIZE = 16; // The default false positive probability value public static final double DEFAULT_FPP = 0.01; // Hash strategy used in this Bloom filter. - public final HashStrategy hashStrategy; + private final HashStrategy hashStrategy; // The underlying byte array for Bloom filter bitset. private byte[] bitset; @@ -74,51 +71,84 @@ public class BlockSplitBloomFilter implements BloomFilter { // Hash function use to compute hash for column value. private HashFunction hashFunction; + private int maximumBytes = DEFAULT_MAXIMUM_BYTES; + private int minimumBytes = DEFAULT_MINIMUM_BYTES; + // The block-based algorithm needs 8 odd SALT values to calculate eight indexes // of bits to set, one per 32-bit word. - private static final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, + private static final int[] SALT = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31}; /** - * Constructor of Bloom filter. + * Constructor of block-based Bloom filter. * * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within - * [MINIMUM_BYTES, MAXIMUM_BYTES], it will be rounded up/down + * [DEFAULT_MINIMUM_BYTES, DEFAULT_MAXIMUM_BYTES], it will be rounded up/down * to lower/upper bound if num_bytes is out of range. It will also be rounded up to a power - * of 2. It uses murmur3_x64_128 as its default hash function. + * of 2. It uses XXH64 as its default hash function. */ public BlockSplitBloomFilter(int numBytes) { - this(numBytes, HashStrategy.MURMUR3_X64_128); + this(numBytes, DEFAULT_MAXIMUM_BYTES, HashStrategy.XXH64); } /** - * Constructor of block-based Bloom filter. It uses murmur3_x64_128 as its default hash - * function. + * Constructor of block-based Bloom filter. + * + * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within + * [DEFAULT_MINIMUM_BYTES, DEFAULT_MAXIMUM_BYTES], it will be rounded up/down + * to lower/upper bound if num_bytes is out of range. It will also be rounded up to a power + * of 2. It uses XXH64 as its default hash function. + * @param maximumBytes The maximum bytes of the Bloom filter. + */ + public BlockSplitBloomFilter(int numBytes, int maximumBytes) { + this(numBytes, maximumBytes, HashStrategy.XXH64); + } + + /** + * Constructor of block-based Bloom filter. * * @param numBytes The number of bytes for Bloom filter bitset * @param hashStrategy The hash strategy of Bloom filter. */ private BlockSplitBloomFilter(int numBytes, HashStrategy hashStrategy) { + this(numBytes, DEFAULT_MAXIMUM_BYTES, hashStrategy); + } + + /** + * Constructor of block-based Bloom filter. + * + * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within + * [DEFAULT_MINIMUM_BYTES, maximumBytes], it will be rounded up/down to lower/upper bound if + * num_bytes is out of range. It will also be rounded up to a power of 2. + * @param maximumBytes The maximum bytes of the Bloom filter. + * @param hashStrategy The adopted hash strategy of the Bloom filter. + */ + public BlockSplitBloomFilter(int numBytes, int maximumBytes, HashStrategy hashStrategy) { + if (maximumBytes > DEFAULT_MINIMUM_BYTES) { + this.maximumBytes = maximumBytes; + } initBitset(numBytes); + switch (hashStrategy) { - case MURMUR3_X64_128: + case XXH64: this.hashStrategy = hashStrategy; - hashFunction = Hashing.murmur3_128(DEFAULT_SEED); + hashFunction = new XxHash(); break; default: throw new RuntimeException("Unsupported hash strategy"); } } + /** * Construct the Bloom filter with given bitset, it is used when reconstructing - * Bloom filter from parquet file. It use murmur3_x64_128 as its default hash + * Bloom filter from parquet file. It use XXH64 as its default hash * function. * * @param bitset The given bitset to construct Bloom filter. */ public BlockSplitBloomFilter(byte[] bitset) { - this(bitset, HashStrategy.MURMUR3_X64_128); + this(bitset, HashStrategy.XXH64); } /** @@ -136,9 +166,9 @@ private BlockSplitBloomFilter(byte[] bitset, HashStrategy hashStrategy) { this.bitset = bitset; this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer(); switch (hashStrategy) { - case MURMUR3_X64_128: + case XXH64: this.hashStrategy = hashStrategy; - hashFunction = Hashing.murmur3_128(DEFAULT_SEED); + hashFunction = new XxHash(); break; default: throw new RuntimeException("Unsupported hash strategy"); @@ -149,21 +179,21 @@ private BlockSplitBloomFilter(byte[] bitset, HashStrategy hashStrategy) { * Create a new bitset for Bloom filter. * * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within - * [MINIMUM_BYTES, MAXIMUM_BYTES], it will be rounded up/down + * [minimumBytes, maximumBytes], it will be rounded up/down * to lower/upper bound if num_bytes is out of range and also will rounded up to a power - * of 2. It uses murmur3_x64_128 as its default hash function and block-based algorithm + * of 2. It uses XXH64 as its default hash function and block-based algorithm * as default algorithm. */ private void initBitset(int numBytes) { - if (numBytes < MINIMUM_BYTES) { - numBytes = MINIMUM_BYTES; + if (numBytes < minimumBytes) { + numBytes = minimumBytes; } // Get next power of 2 if it is not power of 2. if ((numBytes & (numBytes - 1)) != 0) { numBytes = Integer.highestOneBit(numBytes) << 1; } - if (numBytes > MAXIMUM_BYTES || numBytes < 0) { - numBytes = MAXIMUM_BYTES; + if (numBytes > maximumBytes || numBytes < 0) { + numBytes = maximumBytes; } this.bitset = new byte[numBytes]; this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer(); @@ -177,12 +207,14 @@ public void writeTo(OutputStream out) throws IOException { out.write(BytesUtils.intToBytes(hashStrategy.value)); // Write algorithm out.write(BytesUtils.intToBytes(Algorithm.BLOCK.value)); + // Write compression + out.write(BytesUtils.intToBytes(Compression.UNCOMPRESSED.value)); // Write bitset out.write(bitset); } private int[] setMask(int key) { - int mask[] = new int[BITS_SET_PER_BLOCK]; + int[] mask = new int[BITS_SET_PER_BLOCK]; for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) { mask[i] = key * SALT[i]; @@ -199,27 +231,31 @@ private int[] setMask(int key) { @Override public void insertHash(long hash) { - int bucketIndex = (int)(hash >> 32) & (bitset.length / BYTES_PER_BLOCK - 1); + long numBlocks = bitset.length / BYTES_PER_BLOCK; + long lowHash = hash >>> 32; + int blockIndex = (int)((lowHash * numBlocks) >> 32); int key = (int)hash; // Calculate mask for bucket. - int mask[] = setMask(key); + int[] mask = setMask(key); for (int i = 0; i < BITS_SET_PER_BLOCK; i++) { - int value = intBuffer.get(bucketIndex * (BYTES_PER_BLOCK / 4) + i); + int value = intBuffer.get(blockIndex * (BYTES_PER_BLOCK / 4) + i); value |= mask[i]; - intBuffer.put(bucketIndex * (BYTES_PER_BLOCK / 4) + i, value); + intBuffer.put(blockIndex * (BYTES_PER_BLOCK / 4) + i, value); } } @Override public boolean findHash(long hash) { - int bucketIndex = (int)(hash >> 32) & (bitset.length / BYTES_PER_BLOCK - 1); + long numBlocks = bitset.length / BYTES_PER_BLOCK; + long lowHash = hash >>> 32; + int blockIndex = (int)((lowHash * numBlocks) >> 32); int key = (int)hash; // Calculate mask for the tiny Bloom filter. - int mask[] = setMask(key); + int[] mask = setMask(key); for (int i = 0; i < BITS_SET_PER_BLOCK; i++) { - if (0 == (intBuffer.get(bucketIndex * (BYTES_PER_BLOCK / 4) + i) & mask[i])) { + if (0 == (intBuffer.get(blockIndex * (BYTES_PER_BLOCK / 4) + i) & mask[i])) { return false; } } @@ -238,19 +274,19 @@ public static int optimalNumOfBits(long n, double p) { Preconditions.checkArgument((p > 0.0 && p < 1.0), "FPP should be less than 1.0 and great than 0.0"); final double m = -8 * n / Math.log(1 - Math.pow(p, 1.0 / 8)); - final double MAX = MAXIMUM_BYTES << 3; + final double MAX = DEFAULT_MAXIMUM_BYTES << 3; int numBits = (int)m; // Handle overflow. if (m > MAX || m < 0) { numBits = (int)MAX; } - // Get next power of 2 if bits is not power of 2. - if ((numBits & (numBits - 1)) != 0) { - numBits = Integer.highestOneBit(numBits) << 1; - } - if (numBits < (MINIMUM_BYTES << 3)) { - numBits = MINIMUM_BYTES << 3; + + // Round to BITS_PER_BLOCK + numBits = (numBits + BITS_PER_BLOCK -1) & ~BITS_PER_BLOCK; + + if (numBits < (DEFAULT_MINIMUM_BYTES << 3)) { + numBits = DEFAULT_MINIMUM_BYTES << 3; } return numBits; @@ -266,58 +302,58 @@ public long hash(Object value) { ByteBuffer plain; if (value instanceof Binary) { - return hashFunction.hashBytes(((Binary) value).getBytes()).asLong(); + return hashFunction.hashBytes(((Binary) value).getBytes()); } if (value instanceof Integer) { plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE); - plain.order(ByteOrder.LITTLE_ENDIAN).putInt(((Integer)value).intValue()); + plain.order(ByteOrder.LITTLE_ENDIAN).putInt(((Integer)value)); } else if (value instanceof Long) { plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE); - plain.order(ByteOrder.LITTLE_ENDIAN).putLong(((Long)value).longValue()); + plain.order(ByteOrder.LITTLE_ENDIAN).putLong(((Long)value)); } else if (value instanceof Float) { plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE); - plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(((Float)value).floatValue()); + plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(((Float)value)); } else if (value instanceof Double) { plain = ByteBuffer.allocate(Double.SIZE/ Byte.SIZE); - plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(((Double)value).doubleValue()); + plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(((Double)value)); } else { throw new RuntimeException("Parquet Bloom filter: Not supported type"); } - return hashFunction.hashBytes(plain.array()).asLong(); + return hashFunction.hashByteBuffer(plain); } @Override public long hash(int value) { ByteBuffer plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE); plain.order(ByteOrder.LITTLE_ENDIAN).putInt(value); - return hashFunction.hashBytes(plain.array()).asLong(); + return hashFunction.hashByteBuffer(plain); } @Override public long hash(long value) { ByteBuffer plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE); plain.order(ByteOrder.LITTLE_ENDIAN).putLong(value); - return hashFunction.hashBytes(plain.array()).asLong(); + return hashFunction.hashByteBuffer(plain); } @Override public long hash(double value) { ByteBuffer plain = ByteBuffer.allocate(Double.SIZE/Byte.SIZE); plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(value); - return hashFunction.hashBytes(plain.array()).asLong(); + return hashFunction.hashByteBuffer(plain); } @Override public long hash(float value) { ByteBuffer plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE); plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(value); - return hashFunction.hashBytes(plain.array()).asLong(); + return hashFunction.hashByteBuffer(plain); } @Override public long hash(Binary value) { - return hashFunction.hashBytes(value.getBytes()).asLong(); + return hashFunction.hashBytes(value.getBytes()); } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java index a6e548ffb2..8b26c974c1 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java @@ -29,9 +29,15 @@ * a hash strategy and a Bloom filter algorithm. */ public interface BloomFilter { - // Bloom filter Hash strategy. + /* Bloom filter Hash strategy. + * + * xxHash is an extremely fast hash algorithm, running at RAM speed limits. It successfully + * completes the SMHasher test suite which evaluates collision, dispersion and randomness qualities + * of hash functions. It shows good performance advantage from its benchmark result. + * (see https://github.com/Cyan4973/xxHash). + */ enum HashStrategy { - MURMUR3_X64_128(0); + XXH64(0); HashStrategy(int value) { this.value = value; } @@ -47,6 +53,15 @@ enum Algorithm { int value; } + // Bloom filter compression. + enum Compression { + UNCOMPRESSED(0); + Compression(int value) { + this.value = value; + } + int value; + } + /** * Write the Bloom filter to an output stream. It writes the Bloom filter header including the * bitset's length in bytes, the hash strategy, the algorithm, and the bitset. diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java index 0fab73b2a4..e2504d8216 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java @@ -1,5 +1,3 @@ - - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/HashFunction.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/HashFunction.java new file mode 100644 index 0000000000..2043934fb2 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/HashFunction.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.bloomfilter; + +import java.nio.ByteBuffer; + +/** + * A interface contains a set of hash functions used by Bloom filter. + */ +public interface HashFunction { + + /** + * compute the hash value for a byte array. + * @param input the input byte array + * @return a result of long value. + */ + long hashBytes(byte[] input); + + /** + * compute the hash value for a ByteBuffer. + * @param input the input ByteBuffer + * @return a result of long value. + */ + long hashByteBuffer(ByteBuffer input); +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/XxHash.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/XxHash.java new file mode 100644 index 0000000000..6c52b3c987 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/XxHash.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.column.values.bloomfilter; + +import net.openhft.hashing.LongHashFunction; + +import java.nio.ByteBuffer; + +/** + * The implementation of HashFunction interface. The XxHash uses XXH64 version xxHash + * with a seed of 0. + */ +public class XxHash implements HashFunction { + @Override + public long hashBytes(byte[] input) { + return LongHashFunction.xx(0).hashBytes(input); + } + + @Override + public long hashByteBuffer(ByteBuffer input) { + return LongHashFunction.xx(0).hashBytes(input); + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java index 8dbb0ba193..d75c0e2639 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java @@ -37,11 +37,11 @@ public class TestBlockSplitBloomFilter { @Test - public void testConstructor () throws IOException { + public void testConstructor () { BloomFilter bloomFilter1 = new BlockSplitBloomFilter(0); - assertEquals(bloomFilter1.getBitsetSize(), BlockSplitBloomFilter.MINIMUM_BYTES); - BloomFilter bloomFilter2 = new BlockSplitBloomFilter(BlockSplitBloomFilter.MAXIMUM_BYTES + 1); - assertEquals(bloomFilter2.getBitsetSize(), BlockSplitBloomFilter.MAXIMUM_BYTES); + assertEquals(bloomFilter1.getBitsetSize(), BlockSplitBloomFilter.DEFAULT_MINIMUM_BYTES); + BloomFilter bloomFilter2 = new BlockSplitBloomFilter(BlockSplitBloomFilter.DEFAULT_MAXIMUM_BYTES + 1); + assertEquals(bloomFilter2.getBitsetSize(), BlockSplitBloomFilter.DEFAULT_MAXIMUM_BYTES); BloomFilter bloomFilter3 = new BlockSplitBloomFilter(1000); assertEquals(bloomFilter3.getBitsetSize(), 1024); } @@ -55,7 +55,7 @@ public void testConstructor () throws IOException { */ @Test public void testBasic () throws IOException { - final String testStrings[] = {"hello", "parquet", "bloom", "filter"}; + final String[] testStrings = {"hello", "parquet", "bloom", "filter"}; BloomFilter bloomFilter = new BlockSplitBloomFilter(1024); for(int i = 0; i < testStrings.length; i++) { @@ -75,17 +75,21 @@ public void testBasic () throws IOException { fileInputStream.read(value); int hash = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt(); - assertEquals(hash, BloomFilter.HashStrategy.MURMUR3_X64_128.ordinal()); + assertEquals(hash, BloomFilter.HashStrategy.XXH64.ordinal()); fileInputStream.read(value); int algorithm = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt(); assertEquals(algorithm, BloomFilter.Algorithm.BLOCK.ordinal()); + fileInputStream.read(value); + int compression = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt(); + assertEquals(compression, BloomFilter.Compression.UNCOMPRESSED.ordinal()); + byte[] bitset = new byte[length]; fileInputStream.read(bitset); bloomFilter = new BlockSplitBloomFilter(bitset); - for(int i = 0; i < testStrings.length; i++) { - assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(testStrings[i])))); + for (String testString : testStrings) { + assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(testString)))); } } diff --git a/parquet-format-structures/pom.xml b/parquet-format-structures/pom.xml index a5dbabbc1d..a05cd2bc84 100644 --- a/parquet-format-structures/pom.xml +++ b/parquet-format-structures/pom.xml @@ -70,7 +70,7 @@ org.apache.thrift thrift-maven-plugin - ${thrift-maven-plugin.version} + ${thrift-maven-plugin.version} ${parquet.thrift.path} ${format.thrift.executable} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index 897c7c2bec..d2e4c966ac 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -38,6 +38,7 @@ import org.apache.parquet.column.page.PageWriter; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.CodecFactory.BytesCompressor; @@ -49,7 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class ColumnChunkPageWriteStore implements PageWriteStore { +class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore { private static final Logger LOG = LoggerFactory.getLogger(ColumnChunkPageWriteStore.class); private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); @@ -321,6 +322,11 @@ public PageWriter getPageWriter(ColumnDescriptor path) { return writers.get(path); } + @Override + public BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path) { + return writers.get(path); + } + public void flushToFileWriter(ParquetFileWriter writer) throws IOException { for (ColumnDescriptor path : schema.getColumns()) { ColumnChunkPageWriter pageWriter = writers.get(path); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index c3da3239b2..18ee788ce7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -28,6 +28,7 @@ import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; import org.apache.parquet.hadoop.CodecFactory.BytesCompressor; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext; @@ -64,6 +65,7 @@ class InternalParquetRecordWriter { private ColumnWriteStore columnStore; private ColumnChunkPageWriteStore pageStore; + private BloomFilterWriteStore bloomFilterWriteStore; private RecordConsumer recordConsumer; /** @@ -101,9 +103,12 @@ public ParquetMetadata getFooter() { } private void initStore() { - pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator(), - props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled()); - columnStore = props.newColumnWriteStore(schema, pageStore); + ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore(compressor, + schema, props.getAllocator(), props.getColumnIndexTruncateLength()); + pageStore = columnChunkPageWriteStore; + bloomFilterWriteStore = columnChunkPageWriteStore; + + columnStore = props.newColumnWriteStore(schema, pageStore, bloomFilterWriteStore); MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); this.recordConsumer = columnIO.getRecordWriter(columnStore); writeSupport.prepareForWrite(recordConsumer); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index ba155584fe..4cd846c264 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -1074,12 +1074,12 @@ public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException ByteBuffer bloomHeader = ByteBuffer.wrap(bytes); IntBuffer headerBuffer = bloomHeader.order(ByteOrder.LITTLE_ENDIAN).asIntBuffer(); int numBytes = headerBuffer.get(); - if (numBytes <= 0 || numBytes > BlockSplitBloomFilter.MAXIMUM_BYTES) { + if (numBytes <= 0 || numBytes > BlockSplitBloomFilter.DEFAULT_MAXIMUM_BYTES) { return null; } BloomFilter.HashStrategy hash = BloomFilter.HashStrategy.values()[headerBuffer.get()]; - if (hash != BlockSplitBloomFilter.HashStrategy.MURMUR3_X64_128) { + if (hash != BlockSplitBloomFilter.HashStrategy.XXH64) { return null; } @@ -1088,6 +1088,11 @@ public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException return null; } + BloomFilter.Compression compression = BloomFilter.Compression.values()[headerBuffer.get()]; + if (compression != BlockSplitBloomFilter.Compression.UNCOMPRESSED) { + return null; + } + byte[] bitset = new byte[numBytes]; f.readFully(bitset); return new BlockSplitBloomFilter(bitset); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 8741aee3da..4a72134d37 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -23,8 +23,11 @@ import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -148,6 +151,7 @@ public static enum JobSummaryLevel { public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length"; public static final String BLOOM_FILTER_COLUMN_NAMES = "parquet.bloom.filter.column.names"; public static final String BLOOM_FILTER_EXPECTED_NDV = "parquet.bloom.filter.expected.ndv"; + public static final String BLOOM_FILTER_MAX_BYTES = "parquet.bloom.filter.max.bytes"; public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit"; public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled"; @@ -215,6 +219,20 @@ public static boolean getEnableDictionary(JobContext jobContext) { return getEnableDictionary(getConfiguration(jobContext)); } + public static int getBloomFilterMaxBytes(Configuration conf) { + return conf.getInt(BLOOM_FILTER_MAX_BYTES, + ParquetProperties.DEFAULT_MAX_BLOOM_FILTER_BYTES); + } + + public static Set getBloomFilterColumns(Configuration conf) { + String columnNames = conf.get(BLOOM_FILTER_COLUMN_NAMES); + if (columnNames != null) { + return new HashSet<>(Arrays.asList(columnNames.split(","))); + } else { + return new HashSet<>(); + } + } + public static Map getBloomFilterColumnExpectedNDVs(Configuration conf) { Map kv = new HashMap<>(); String columnNamesConf = conf.get(BLOOM_FILTER_COLUMN_NAMES); @@ -443,7 +461,9 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp .withPageSize(getPageSize(conf)) .withDictionaryPageSize(getDictionaryPageSize(conf)) .withDictionaryEncoding(getEnableDictionary(conf)) - .withBloomFilterInfo(getBloomFilterColumnExpectedNDVs(conf)) + .withBloomFilterColumnNames(getBloomFilterColumns(conf)) + .withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf)) + .withBloomFilterColumnNdvs(getBloomFilterColumnExpectedNDVs(conf)) .withWriterVersion(getWriterVersion(conf)) .estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf)) .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf)) @@ -469,7 +489,8 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck()); LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck()); LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength()); - LOG.info("Bloom filter enabled column names are: {}", props.getBloomFilterColumnExpectedNDVs().keySet()); + LOG.info("Max Bloom filter size for a column is {}", props.getMaxBloomFilterBytes()); + LOG.info("Bloom filter enabled column names are: {}", props.getBloomFilterColumns()); LOG.info("Bloom filter enabled column expected number of distinct values are: {}", props.getBloomFilterColumnExpectedNDVs().values()); LOG.info("Page row count limit to {}", props.getPageRowCountLimit()); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 7fb71864b7..638d4e7748 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,6 +20,8 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -537,6 +539,21 @@ public SELF withPageWriteChecksumEnabled(boolean enablePageWriteChecksum) { return self(); } + /** + * Enables bloom filter column names for the constructed writer. + * + * @return this builder for method chaining. + */ + public SELF withBloomFilterColumnNames(String... columnNames) { + if (columnNames != null) { + encodingPropsBuilder.withBloomFilterColumnNames( + new HashSet<>(Arrays.asList(columnNames)) + ); + } + + return self(); + } + /** * Set a property that will be available to the read path. For writers that use a Hadoop * configuration, this is the recommended way to add configuration values. diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index dd84e63085..3de4524390 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -222,14 +222,14 @@ public void testWriteRead() throws Exception { } @Test - public void testBloomWriteRead() throws Exception { + public void testBloomFilterWriteRead() throws Exception { MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }"); File testFile = temp.newFile(); testFile.delete(); Path path = new Path(testFile.toURI()); Configuration configuration = new Configuration(); - configuration.set("parquet.bloomFilter.filter.column.names", "foo"); - String colPath[] = {"foo"}; + configuration.set("parquet.bloom.filter.column.names", "foo"); + String[] colPath = {"foo"}; ColumnDescriptor col = schema.getColumnDescription(colPath); BinaryStatistics stats1 = new BinaryStatistics(); ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); @@ -239,19 +239,19 @@ public void testBloomWriteRead() throws Exception { w.writeDataPage(2, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN); w.writeDataPage(3, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN); w.endColumn(); - BloomFilter bloomData = new BlockSplitBloomFilter(0); - bloomData.insertHash(bloomData.hash(Binary.fromString("hello"))); - bloomData.insertHash(bloomData.hash(Binary.fromString("world"))); - w.writeBloomFilter(bloomData); + BloomFilter blockSplitBloomFilter = new BlockSplitBloomFilter(0); + blockSplitBloomFilter.insertHash(blockSplitBloomFilter.hash(Binary.fromString("hello"))); + blockSplitBloomFilter.insertHash(blockSplitBloomFilter.hash(Binary.fromString("world"))); + w.writeBloomFilter(blockSplitBloomFilter); w.endBlock(); - w.end(new HashMap()); + w.end(new HashMap<>()); ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path); ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath))); BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0)); BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(readFooter.getBlocks().get(0).getColumns().get(0)); - assertTrue(bloomFilter.findHash(bloomData.hash(Binary.fromString("hello")))); - assertTrue(bloomFilter.findHash(bloomData.hash(Binary.fromString("world")))); + assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("hello")))); + assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("world")))); } @Test diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index 25c9608563..343b1fa768 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,6 +19,8 @@ package org.apache.parquet.hadoop; import static java.util.Arrays.asList; +import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -45,8 +47,11 @@ import java.util.Map; import java.util.concurrent.Callable; +import net.openhft.hashing.LongHashFunction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.example.data.GroupFactory; import org.apache.parquet.hadoop.example.ExampleInputFormat; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.schema.GroupType; @@ -207,4 +212,43 @@ public void testNullValuesWithPageRowLimit() throws IOException { assertEquals("Number of written records should be equal to the read one", recordCount, readRecordCount); } } + + @Test + public void testParquetFileWithBloomFilter() throws IOException { + MessageType schema = Types.buildMessage(). + required(BINARY).as(stringType()).named("name").named("msg"); + + String[] testNames = {"hello", "parquet", "bloom", "filter"}; + + final int recordCount = testNames.length; + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + + GroupFactory factory = new SimpleGroupFactory(schema); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withPageRowCountLimit(10) + .withConf(conf) + .withDictionaryEncoding(false) + .withBloomFilterColumnNames("name") + .build()) { + for (String testName : testNames) { + writer.write(factory.newGroup().append("name", testName)); + } + } + + ParquetMetadata footer = readFooter(conf, path, NO_FILTER); + ParquetFileReader reader = new ParquetFileReader( + conf, footer.getFileMetaData(), path, footer.getBlocks(), schema.getColumns()); + + BloomFilter bloomFilter = reader.getBloomFilterDataReader(footer.getBlocks().get(0)) + .readBloomFilter(footer.getBlocks().get(0).getColumns().get(0)); + + for (String name: testNames) { + assertTrue(bloomFilter.findHash( + LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer()))); + } + } } diff --git a/pom.xml b/pom.xml index 5dd4aae7ad..6ea28ba5a2 100644 --- a/pom.xml +++ b/pom.xml @@ -81,7 +81,7 @@ 1.2.1 2.7.1 3.1.2 - 2.7.0-SNAPSHOT + 2.7.0 1.7.0 thrift thrift