diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java index b6378976c3..d8ac0b435a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java @@ -18,11 +18,13 @@ */ package org.apache.parquet.column.values.bloomfilter; + import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.io.api.Binary; + import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -139,7 +141,7 @@ private BlockSplitBloomFilter(byte[] bitset, HashStrategy hashStrategy) { hashFunction = Hashing.murmur3_128(DEFAULT_SEED); break; default: - throw new RuntimeException("Not supported hash strategy"); + throw new RuntimeException("Unsupported hash strategy"); } } @@ -254,6 +256,38 @@ public static int optimalNumOfBits(long n, double p) { return numBits; } + @Override + public long getBitsetSize() { + return this.bitset.length; + } + + @Override + public long hash(Object value) { + ByteBuffer plain; + + if (value instanceof Binary) { + return hashFunction.hashBytes(((Binary) value).getBytes()).asLong(); + } + + if (value instanceof Integer) { + plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE); + plain.order(ByteOrder.LITTLE_ENDIAN).putInt(((Integer)value).intValue()); + } else if (value instanceof Long) { + plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE); + plain.order(ByteOrder.LITTLE_ENDIAN).putLong(((Long)value).longValue()); + } else if (value instanceof Float) { + plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE); + plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(((Float)value).floatValue()); + } else if (value instanceof Double) { + plain = ByteBuffer.allocate(Double.SIZE/ Byte.SIZE); + plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(((Double)value).doubleValue()); + } else { + throw new RuntimeException("Parquet Bloom filter: Not supported type"); + } + + return hashFunction.hashBytes(plain.array()).asLong(); + } + @Override public long hash(int value) { ByteBuffer plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE); @@ -286,9 +320,4 @@ public long hash(float value) { public long hash(Binary value) { return hashFunction.hashBytes(value.getBytes()).asLong(); } - - @Override - public long getBitsetSize() { - return this.bitset.length; - } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java index 3ec192e3e0..a6e548ffb2 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java @@ -19,6 +19,7 @@ package org.apache.parquet.column.values.bloomfilter; import org.apache.parquet.io.api.Binary; + import java.io.IOException; import java.io.OutputStream; @@ -70,6 +71,13 @@ enum Algorithm { */ boolean findHash(long hash); + /** + * Get the number of bytes for bitset in this Bloom filter. + * + * @return The number of bytes for bitset in this Bloom filter. + */ + long getBitsetSize(); + /** * Compute hash for int value by using its plain encoding result. * @@ -111,9 +119,10 @@ enum Algorithm { long hash(Binary value); /** - * Get the number of bytes for bitset in this Bloom filter. + * Compute hash for Object value by using its plain encoding result. * - * @return The number of bytes for bitset in this Bloom filter. + * @param value the value to hash + * @return hash result */ - long getBitsetSize(); + long hash(Object value); } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReadStore.java deleted file mode 100644 index 3373bc1a0e..0000000000 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReadStore.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.column.values.bloomfilter; - -import org.apache.parquet.column.ColumnDescriptor; - -/** - * contains all the bloom filter reader for all columns of a row group - */ -public interface BloomFilterReadStore { - /** - * Get a Bloom filter reader of a column - * - * @param path the descriptor of the column - * @return the corresponding Bloom filter writer - */ - BloomFilterReader getBloomFilterReader(ColumnDescriptor path); -} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReader.java deleted file mode 100644 index 7a430581dd..0000000000 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReader.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.parquet.column.values.bloomfilter; - -import org.apache.parquet.column.ColumnDescriptor; - -public interface BloomFilterReader { - /** - * Returns a {@link BloomFilter} for the given column descriptor. - * - * @param path the descriptor of the column - * @return the bloomFilter dta for that column, or null if there isn't one - */ - BloomFilter readBloomFilter(ColumnDescriptor path); -} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/BloomFilterLevel/BloomFilterImpl.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/BloomFilterLevel/BloomFilterImpl.java new file mode 100644 index 0000000000..c1e377403e --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/BloomFilterLevel/BloomFilterImpl.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.filter2.BloomFilterLevel; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators; +import org.apache.parquet.filter2.predicate.UserDefinedPredicate; +import org.apache.parquet.hadoop.BloomFilterReader; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; + +import static org.apache.parquet.Preconditions.checkNotNull; + +public class BloomFilterImpl implements FilterPredicate.Visitor{ + private static final Logger LOG = LoggerFactory.getLogger(BloomFilterImpl.class); + private static final boolean BLOCK_MIGHT_MATCH = false; + private static final boolean BLOCK_CANNOT_MATCH = true; + + private final Map columns = new HashMap(); + + public static boolean canDrop(FilterPredicate pred, List columns, BloomFilterReader bloomFilterReader) { + checkNotNull(pred, "pred"); + checkNotNull(columns, "columns"); + return pred.accept(new BloomFilterImpl(columns, bloomFilterReader)); + } + + private BloomFilterImpl(List columnsList, BloomFilterReader bloomFilterReader) { + for (ColumnChunkMetaData chunk : columnsList) { + columns.put(chunk.getPath(), chunk); + } + + this.bloomFilterReader = bloomFilterReader; + } + + private BloomFilterReader bloomFilterReader; + + private ColumnChunkMetaData getColumnChunk(ColumnPath columnPath) { + return columns.get(columnPath); + } + + @Override + public > Boolean visit(Operators.Eq eq) { + T value = eq.getValue(); + + if (value == null) { + // the bloom filter bitset contains only non-null values so isn't helpful. this + // could check the column stats, but the StatisticsFilter is responsible + return BLOCK_MIGHT_MATCH; + } + + Operators.Column filterColumn = eq.getColumn(); + ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath()); + if (meta == null) { + // the column isn't in this file so all values are null, but the value + // must be non-null because of the above check. + return BLOCK_CANNOT_MATCH; + } + + try { + BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(meta); + if (bloomFilter != null && !bloomFilter.findHash(bloomFilter.hash(value))) { + return BLOCK_CANNOT_MATCH; + } + } catch (RuntimeException e) { + LOG.warn(e.getMessage()); + return BLOCK_MIGHT_MATCH; + } + + return BLOCK_MIGHT_MATCH; + } + + @Override + public > Boolean visit(Operators.NotEq notEq) { + return BLOCK_MIGHT_MATCH; + } + + @Override + public > Boolean visit(Operators.Lt lt) { + return BLOCK_MIGHT_MATCH; + } + + @Override + public > Boolean visit(Operators.LtEq ltEq) { + return BLOCK_MIGHT_MATCH; + } + + @Override + public > Boolean visit(Operators.Gt gt) { + return BLOCK_MIGHT_MATCH; + } + + @Override + public > Boolean visit(Operators.GtEq gtEq) { + return BLOCK_MIGHT_MATCH; + } + + @Override + public Boolean visit(Operators.And and) { + return and.getLeft().accept(this) || and.getRight().accept(this); + } + + @Override + public Boolean visit(Operators.Or or) { + return or.getLeft().accept(this) && or.getRight().accept(this); + } + + @Override + public Boolean visit(Operators.Not not) { + throw new IllegalArgumentException( + "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not); + } + + private , U extends UserDefinedPredicate> Boolean visit(Operators.UserDefined ud, boolean inverted) { + return BLOCK_MIGHT_MATCH; + } + + @Override + public , U extends UserDefinedPredicate> Boolean visit(Operators.UserDefined udp) { + return visit(udp, false); + } + + @Override + public , U extends UserDefinedPredicate> Boolean visit(Operators.LogicalNotUserDefined udp) { + return visit(udp.getUserDefined(), true); + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java index d1d40e9d78..fe6f637bff 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; +import org.apache.parquet.filter2.BloomFilterLevel.BloomFilterImpl; import org.apache.parquet.filter2.compat.FilterCompat.Filter; import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter; import org.apache.parquet.filter2.compat.FilterCompat.Visitor; @@ -32,6 +33,8 @@ import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.parquet.Preconditions.checkNotNull; @@ -45,10 +48,12 @@ public class RowGroupFilter implements Visitor> { private final MessageType schema; private final List levels; private final ParquetFileReader reader; + private Logger logger = LoggerFactory.getLogger(RowGroupFilter.class); public enum FilterLevel { STATISTICS, - DICTIONARY + DICTIONARY, + BLOOMFILTER } /** @@ -104,6 +109,11 @@ public List visit(FilterCompat.FilterPredicateCompat filterPredic drop = DictionaryFilter.canDrop(filterPredicate, block.getColumns(), reader.getDictionaryReader(block)); } + if (!drop && levels.contains(FilterLevel.BLOOMFILTER)) { + drop = BloomFilterImpl.canDrop(filterPredicate, block.getColumns(), reader.getBloomFilterDataReader(block)); + if (drop) logger.info("Block drop by Bloom filter"); + } + if(!drop) { filteredBlocks.add(block); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterDataReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterReader.java similarity index 51% rename from parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterDataReader.java rename to parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterReader.java index 96e258fe40..3ad91ce01e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterDataReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterReader.java @@ -17,55 +17,50 @@ * under the License. */ package org.apache.parquet.hadoop; + import java.io.IOException; import java.util.HashMap; import java.util.Map; -import org.apache.parquet.Strings; -import org.apache.parquet.column.ColumnDescriptor; + import org.apache.parquet.column.values.bloomfilter.BloomFilter; -import org.apache.parquet.column.values.bloomfilter.BloomFilterReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; -import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.hadoop.metadata.ColumnPath; + /** - * A {@link BloomFilterReader} implementation that reads Bloom filter data from - * an open {@link ParquetFileReader}. + * Bloom filter reader that reads Bloom filter data from an open {@link ParquetFileReader}. * */ -public class BloomFilterDataReader implements BloomFilterReader { +public class BloomFilterReader { private final ParquetFileReader reader; - private final Map columns; - private final Map cache = new HashMap<>(); - public BloomFilterDataReader(ParquetFileReader fileReader, BlockMetaData block) { + private final Map columns; + private final Map cache = new HashMap<>(); + + public BloomFilterReader(ParquetFileReader fileReader, BlockMetaData block) { this.reader = fileReader; this.columns = new HashMap<>(); for (ColumnChunkMetaData column : block.getColumns()) { - columns.put(column.getPath().toDotString(), column); + columns.put(column.getPath(), column); } } - @Override - public BloomFilter readBloomFilter(ColumnDescriptor descriptor) { - String dotPath = Strings.join(descriptor.getPath(), "."); - ColumnChunkMetaData column = columns.get(dotPath); - if (column == null) { - throw new ParquetDecodingException( - "Cannot load Bloom filter data, unknown column: " + dotPath); - } - if (cache.containsKey(dotPath)) { - return cache.get(dotPath); + + public BloomFilter readBloomFilter(ColumnChunkMetaData meta) { + if (cache.containsKey(meta.getPath())) { + return cache.get(meta.getPath()); } try { synchronized (cache) { - if (!cache.containsKey(dotPath)) { - BloomFilter bloomFilter = reader.readBloomFilter(column); + if (!cache.containsKey(meta.getPath())) { + BloomFilter bloomFilter = reader.readBloomFilter(meta); if (bloomFilter == null) return null; - cache.put(dotPath, bloomFilter); + cache.put(meta.getPath(), bloomFilter); } } - return cache.get(dotPath); + return cache.get(meta.getPath()); } catch (IOException e) { - throw new ParquetDecodingException( - "Failed to read Bloom data", e); + throw new RuntimeException( + "Failed to read Bloom filter data", e); } } + } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 7fe0e410ee..860e044041 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -1049,8 +1049,8 @@ private DictionaryPage readCompressedDictionary( converter.getEncoding(dictHeader.getEncoding())); } - public BloomFilterDataReader getBloomFilterDataReader(BlockMetaData block) { - return new BloomFilterDataReader(this, block); + public BloomFilterReader getBloomFilterDataReader(BlockMetaData block) { + return new BloomFilterReader(this, block); } /** diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 71ca5ea93d..e19e35cfac 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -29,7 +29,6 @@ import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter; import org.apache.parquet.column.values.bloomfilter.BloomFilter; -import org.apache.parquet.column.values.bloomfilter.BloomFilterReader; import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; import org.junit.Assume; import org.junit.Rule; @@ -249,10 +248,10 @@ public void testBloomWriteRead() throws Exception { ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path); ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath))); - BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0)); - BloomFilter bloomDataRead = bloomFilterReader.readBloomFilter(col); - assertTrue(bloomDataRead.findHash(bloomData.hash(Binary.fromString("hello")))); - assertTrue(bloomDataRead.findHash(bloomData.hash(Binary.fromString("world")))); + BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0)); + BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(readFooter.getBlocks().get(0).getColumns().get(0)); + assertTrue(bloomFilter.findHash(bloomData.hash(Binary.fromString("hello")))); + assertTrue(bloomFilter.findHash(bloomData.hash(Binary.fromString("world")))); } @Test