diff --git a/.travis.yml b/.travis.yml
index e4e623f03b..17d7ee7dbb 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -29,6 +29,10 @@ before_install:
- sudo make install
- cd ..
- date
+ - git clone https://github.com/apache/parquet-format.git
+ - cd parquet-format
+ - mvn install -DskipTests
+ - cd ..
env:
- HADOOP_PROFILE=default TEST_CODECS=uncompressed,brotli
diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml
index cfdc5553dc..83c0118ec0 100644
--- a/parquet-column/pom.xml
+++ b/parquet-column/pom.xml
@@ -93,7 +93,6 @@
com.google.guava
guava
${guava.version}
- test
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 41e482cfdd..4df5b71260 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,6 +35,8 @@
import org.apache.parquet.column.values.factory.ValuesWriterFactory;
import org.apache.parquet.schema.MessageType;
+import java.util.HashMap;
+
/**
* This class represents all the configurable Parquet properties.
*/
@@ -48,6 +50,8 @@ public class ParquetProperties {
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
+ public static final boolean DEFAULT_BLOOM_FILTER_ENABLED = false;
+
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();
@@ -86,11 +90,16 @@ public static WriterVersion fromString(String name) {
private final ByteBufferAllocator allocator;
private final ValuesWriterFactory valuesWriterFactory;
private final int columnIndexTruncateLength;
+ private final boolean enableBloomFilter;
+
+ // The key-value pair represents the column name and its expected distinct number of values in a row group.
+ private final HashMap bloomFilterExpectedDistinctNumbers;
private final int pageRowCountLimit;
private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
- ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit) {
+ ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit,
+ boolean enableBloomFilter, HashMap bloomFilterExpectedDistinctNumber) {
this.pageSizeThreshold = pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
@@ -104,6 +113,9 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag
this.valuesWriterFactory = writerFactory;
this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
+
+ this.enableBloomFilter = enableBloomFilter;
+ this.bloomFilterExpectedDistinctNumbers = bloomFilterExpectedDistinctNumber;
this.pageRowCountLimit = pageRowCountLimit;
}
@@ -201,6 +213,14 @@ public int getPageRowCountLimit() {
return pageRowCountLimit;
}
+ public boolean isBloomFilterEnabled() {
+ return enableBloomFilter;
+ }
+
+ public HashMap getBloomFilterColumnExpectedNDVs() {
+ return bloomFilterExpectedDistinctNumbers;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -220,6 +240,8 @@ public static class Builder {
private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+ private boolean enableBloomFilter = DEFAULT_BLOOM_FILTER_ENABLED;
+ private HashMap bloomFilterColumnExpectedNDVs = new HashMap<>();
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private Builder() {
@@ -236,6 +258,8 @@ private Builder(ParquetProperties toCopy) {
this.valuesWriterFactory = toCopy.valuesWriterFactory;
this.allocator = toCopy.allocator;
this.pageRowCountLimit = toCopy.pageRowCountLimit;
+ this.enableBloomFilter = toCopy.enableBloomFilter;
+ this.bloomFilterColumnExpectedNDVs = toCopy.bloomFilterExpectedDistinctNumbers;
}
/**
@@ -324,6 +348,27 @@ public Builder withColumnIndexTruncateLength(int length) {
return this;
}
+ /**
+ * Set to enable Bloom filter.
+ *
+ * @param enableBloomFilter a boolean to indicate whether to enable Bloom filter.
+ * @return this builder for method chaining.
+ */
+ public Builder withBloomFilterEnabled(boolean enableBloomFilter) {
+ this.enableBloomFilter = enableBloomFilter;
+ return this;
+ }
+ /**
+ * Set Bloom filter info for columns.
+ *
+ * @param columnExpectedNDVs the columns expected number of distinct values in a row group
+ * @return this builder for method chaining
+ */
+ public Builder withBloomFilterInfo(HashMap columnExpectedNDVs) {
+ this.bloomFilterColumnExpectedNDVs = columnExpectedNDVs;
+ return this;
+ }
+
public Builder withPageRowCountLimit(int rowCount) {
Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: " + rowCount);
pageRowCountLimit = rowCount;
@@ -334,7 +379,8 @@ public ParquetProperties build() {
ParquetProperties properties =
new ParquetProperties(writerVersion, pageSize, dictPageSize,
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
- estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength, pageRowCountLimit);
+ estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength, pageRowCountLimit,
+ enableBloomFilter, bloomFilterColumnExpectedNDVs);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
// we'd like to decouple that and won't need to pass an object to properties and then pass the
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
index ac9aaca26a..d7019cd766 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
@@ -34,6 +34,8 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.schema.MessageType;
/**
@@ -74,7 +76,7 @@ private interface ColumnWriterProvider {
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
ColumnWriterBase column = columns.get(path);
if (column == null) {
- column = createColumnWriter(path, pageWriteStore.getPageWriter(path), props);
+ column = createColumnWriter(path, pageWriteStore.getPageWriter(path), null, props);
columns.put(path, column);
}
return column;
@@ -91,7 +93,7 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
Map mcolumns = new TreeMap<>();
for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
- mcolumns.put(path, createColumnWriter(path, pageWriter, props));
+ mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
}
this.columns = unmodifiableMap(mcolumns);
@@ -105,7 +107,38 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
};
}
- abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props);
+ // The Bloom filter is written to a specified bitset instead of pages, so it needs a separate write store abstract.
+ ColumnWriteStoreBase(
+ MessageType schema,
+ PageWriteStore pageWriteStore,
+ BloomFilterWriteStore bloomFilterWriteStore,
+ ParquetProperties props) {
+ this.props = props;
+ this.thresholdTolerance = (long) (props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);
+ Map mcolumns = new TreeMap<>();
+ for (ColumnDescriptor path : schema.getColumns()) {
+ PageWriter pageWriter = pageWriteStore.getPageWriter(path);
+ if (props.isBloomFilterEnabled() && props.getBloomFilterColumnExpectedNDVs() != null) {
+ BloomFilterWriter bloomFilterWriter = bloomFilterWriteStore.getBloomFilterWriter(path);
+ mcolumns.put(path, createColumnWriter(path, pageWriter, bloomFilterWriter, props));
+ } else {
+ mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
+ }
+ }
+ this.columns = unmodifiableMap(mcolumns);
+
+ this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
+
+ columnWriterProvider = new ColumnWriterProvider() {
+ @Override
+ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
+ return columns.get(path);
+ }
+ };
+ }
+
+ abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
+ BloomFilterWriter bloomFilterWriter, ParquetProperties props);
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
return columnWriterProvider.getColumnWriter(path);
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
index 7258423fb4..dd13b0b8a4 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,10 +22,11 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.schema.MessageType;
public class ColumnWriteStoreV1 extends ColumnWriteStoreBase {
-
public ColumnWriteStoreV1(MessageType schema, PageWriteStore pageWriteStore, ParquetProperties props) {
super(schema, pageWriteStore, props);
}
@@ -36,8 +37,15 @@ public ColumnWriteStoreV1(final PageWriteStore pageWriteStore,
super(pageWriteStore, props);
}
+ public ColumnWriteStoreV1(MessageType schema, PageWriteStore pageWriteStore,
+ BloomFilterWriteStore bloomFilterWriteStore,
+ ParquetProperties props) {
+ super (schema, pageWriteStore, bloomFilterWriteStore, props);
+ }
+
@Override
- ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
- return new ColumnWriterV1(path, pageWriter, props);
+ ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
+ BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
+ return new ColumnWriterV1(path, pageWriter, bloomFilterWriter, props);
}
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
index bf1090d0bc..a9f2d5848d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,16 +22,24 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.schema.MessageType;
public class ColumnWriteStoreV2 extends ColumnWriteStoreBase {
-
public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore, ParquetProperties props) {
super(schema, pageWriteStore, props);
}
+ public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore,
+ BloomFilterWriteStore bloomFilterWriteStore,
+ ParquetProperties props) {
+ super(schema, pageWriteStore, bloomFilterWriteStore, props);
+ }
+
@Override
- ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
- return new ColumnWriterV2(path, pageWriter, props);
+ ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
+ BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
+ return new ColumnWriterV2(path, pageWriter, bloomFilterWriter, props);
}
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index 3788c82e46..c03b04fc5e 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -19,6 +19,7 @@
package org.apache.parquet.column.impl;
import java.io.IOException;
+import java.util.HashMap;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriter;
@@ -27,6 +28,9 @@
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.io.api.Binary;
import org.slf4j.Logger;
@@ -53,6 +57,9 @@ abstract class ColumnWriterBase implements ColumnWriter {
private long rowsWrittenSoFar = 0;
private int pageRowCount;
+ private BloomFilterWriter bloomFilterWriter;
+ private BloomFilter bloomFilter;
+
ColumnWriterBase(
ColumnDescriptor path,
PageWriter pageWriter,
@@ -66,6 +73,29 @@ abstract class ColumnWriterBase implements ColumnWriter {
this.dataColumn = props.newValuesWriter(path);
}
+ ColumnWriterBase(
+ ColumnDescriptor path,
+ PageWriter pageWriter,
+ BloomFilterWriter bloomFilterWriter,
+ ParquetProperties props
+ ) {
+ this(path, pageWriter, props);
+
+ // Bloom filters don't support nested columns yet; see PARQUET-1453.
+ if (path.getPath().length != 1 || bloomFilterWriter == null) {
+ return;
+ }
+
+ this.bloomFilterWriter = bloomFilterWriter;
+ HashMap bloomFilterColumnExpectedNDVs = props.getBloomFilterColumnExpectedNDVs();
+ String column = path.getPath()[0];
+ if (bloomFilterColumnExpectedNDVs.keySet().contains(column)) {
+ int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits(bloomFilterColumnExpectedNDVs.get(column).intValue(),
+ BlockSplitBloomFilter.DEFAULT_FPP);
+ this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits/8);
+ }
+ }
+
abstract ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path);
abstract ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path);
@@ -122,6 +152,36 @@ public long getBufferedSizeInMemory() {
+ pageWriter.getMemSize();
}
+ private void updateBloomFilter(int value) {
+ if (bloomFilter != null) {
+ bloomFilter.insertHash(bloomFilter.hash(value));
+ }
+ }
+
+ private void updateBloomFilter(long value) {
+ if (bloomFilter != null) {
+ bloomFilter.insertHash(bloomFilter.hash(value));
+ }
+ }
+
+ private void updateBloomFilter(double value) {
+ if (bloomFilter != null) {
+ bloomFilter.insertHash(bloomFilter.hash(value));
+ }
+ }
+
+ private void updateBloomFilter(float value) {
+ if (bloomFilter != null) {
+ bloomFilter.insertHash(bloomFilter.hash(value));
+ }
+ }
+
+ private void updateBloomFilter(Binary value) {
+ if (bloomFilter != null) {
+ bloomFilter.insertHash(bloomFilter.hash(value));
+ }
+ }
+
/**
* Writes the current value
*
@@ -137,6 +197,7 @@ public void write(double value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeDouble(value);
statistics.updateStats(value);
+ updateBloomFilter(value);
++valueCount;
}
@@ -155,6 +216,7 @@ public void write(float value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeFloat(value);
statistics.updateStats(value);
+ updateBloomFilter(value);
++valueCount;
}
@@ -173,6 +235,7 @@ public void write(Binary value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeBytes(value);
statistics.updateStats(value);
+ updateBloomFilter(value);
++valueCount;
}
@@ -209,6 +272,7 @@ public void write(int value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeInteger(value);
statistics.updateStats(value);
+ updateBloomFilter(value);
++valueCount;
}
@@ -227,6 +291,7 @@ public void write(long value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeLong(value);
statistics.updateStats(value);
+ updateBloomFilter(value);
++valueCount;
}
@@ -246,6 +311,10 @@ void finalizeColumnChunk() {
}
dataColumn.resetDictionary();
}
+
+ if (bloomFilterWriter != null && bloomFilter != null) {
+ bloomFilterWriter.writeBloomFilter(bloomFilter);
+ }
}
/**
@@ -265,20 +334,24 @@ long getCurrentPageBufferedSize() {
* @return the number of bytes of memory used to buffer the current data and the previously written pages
*/
long getTotalBufferedSize() {
+ long bloomBufferSize = bloomFilter == null ? 0 : bloomFilter.getBitsetSize();
return repetitionLevelColumn.getBufferedSize()
+ definitionLevelColumn.getBufferedSize()
+ dataColumn.getBufferedSize()
- + pageWriter.getMemSize();
+ + pageWriter.getMemSize()
+ + bloomBufferSize;
}
/**
* @return actual memory used
*/
long allocatedSize() {
+ long bloomAllocatedSize = bloomFilter == null ? 0 : bloomFilter.getBitsetSize();
return repetitionLevelColumn.getAllocatedSize()
+ definitionLevelColumn.getAllocatedSize()
+ dataColumn.getAllocatedSize()
- + pageWriter.allocatedSize();
+ + pageWriter.allocatedSize()
+ + bloomAllocatedSize;
}
/**
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
index 646e31aa7e..1d732b837d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,16 +27,21 @@
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
/**
* Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
*/
final class ColumnWriterV1 extends ColumnWriterBase {
-
ColumnWriterV1(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
super(path, pageWriter, props);
}
+ public ColumnWriterV1(ColumnDescriptor path, PageWriter pageWriter,
+ BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
+ super(path, pageWriter, bloomFilterWriter, props);
+ }
+
@Override
ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path) {
return props.newRepetitionLevelWriter(path);
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
index 04076c96ba..8e9e6f7fa2 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,6 +29,7 @@
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
import org.apache.parquet.io.ParquetEncodingException;
@@ -54,12 +55,18 @@ public BytesInput getBytes() {
}
}
- private static final ValuesWriter NULL_WRITER = new DevNullValuesWriter();
ColumnWriterV2(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
super(path, pageWriter, props);
}
+ private static final ValuesWriter NULL_WRITER = new DevNullValuesWriter();
+
+ ColumnWriterV2(ColumnDescriptor path, PageWriter pageWriter, BloomFilterWriter bloomFilterWriter,
+ ParquetProperties props) {
+ super(path, pageWriter, bloomFilterWriter, props);
+ }
+
@Override
ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path) {
return path.getMaxRepetitionLevel() == 0 ? NULL_WRITER : new RLEWriterForV2(props.newRepetitionLevelEncoder(path));
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
new file mode 100644
index 0000000000..b6378976c3
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.io.api.Binary;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+
+/*
+ * This Bloom filter is implemented using block-based Bloom filter algorithm from Putze et al.'s
+ * "Cache-, Hash- and Space-Efficient Bloom filters". The basic idea is to hash the item to a tiny
+ * Bloom filter which size fit a single cache line or smaller. This implementation sets 8 bits in
+ * each tiny Bloom filter. Each tiny Bloom filter is 32 bytes to take advantage of 32-byte SIMD
+ * instruction.
+ */
+public class BlockSplitBloomFilter implements BloomFilter {
+ // Bytes in a tiny Bloom filter block.
+ private static final int BYTES_PER_BLOCK = 32;
+
+ // Default seed for the hash function. It comes from System.nanoTime().
+ private static final int DEFAULT_SEED = 1361930890;
+
+ // Minimum Bloom filter size, set to the size of a tiny Bloom filter block
+ public static final int MINIMUM_BYTES = 32;
+
+ // Maximum Bloom filter size, set to the default HDFS block size for upper boundary check
+ // This should be re-consider when implementing write side logic.
+ public static final int MAXIMUM_BYTES = 128 * 1024 * 1024;
+
+ // The number of bits to set in a tiny Bloom filter
+ private static final int BITS_SET_PER_BLOCK = 8;
+
+ // The metadata in the header of a serialized Bloom filter is three four-byte values: the number of bytes,
+ // the filter algorithm, and the hash algorithm.
+ public static final int HEADER_SIZE = 12;
+
+ // The default false positive probability value
+ public static final double DEFAULT_FPP = 0.01;
+
+ // Hash strategy used in this Bloom filter.
+ public final HashStrategy hashStrategy;
+
+ // The underlying byte array for Bloom filter bitset.
+ private byte[] bitset;
+
+ // A integer array buffer of underlying bitset to help setting bits.
+ private IntBuffer intBuffer;
+
+ // Hash function use to compute hash for column value.
+ private HashFunction hashFunction;
+
+ // The block-based algorithm needs 8 odd SALT values to calculate eight indexes
+ // of bits to set, one per 32-bit word.
+ private static final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d,
+ 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};
+
+ /**
+ * Constructor of Bloom filter.
+ *
+ * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
+ * [MINIMUM_BYTES, MAXIMUM_BYTES], it will be rounded up/down
+ * to lower/upper bound if num_bytes is out of range. It will also be rounded up to a power
+ * of 2. It uses murmur3_x64_128 as its default hash function.
+ */
+ public BlockSplitBloomFilter(int numBytes) {
+ this(numBytes, HashStrategy.MURMUR3_X64_128);
+ }
+
+ /**
+ * Constructor of block-based Bloom filter. It uses murmur3_x64_128 as its default hash
+ * function.
+ *
+ * @param numBytes The number of bytes for Bloom filter bitset
+ * @param hashStrategy The hash strategy of Bloom filter.
+ */
+ private BlockSplitBloomFilter(int numBytes, HashStrategy hashStrategy) {
+ initBitset(numBytes);
+ switch (hashStrategy) {
+ case MURMUR3_X64_128:
+ this.hashStrategy = hashStrategy;
+ hashFunction = Hashing.murmur3_128(DEFAULT_SEED);
+ break;
+ default:
+ throw new RuntimeException("Unsupported hash strategy");
+ }
+ }
+
+ /**
+ * Construct the Bloom filter with given bitset, it is used when reconstructing
+ * Bloom filter from parquet file. It use murmur3_x64_128 as its default hash
+ * function.
+ *
+ * @param bitset The given bitset to construct Bloom filter.
+ */
+ public BlockSplitBloomFilter(byte[] bitset) {
+ this(bitset, HashStrategy.MURMUR3_X64_128);
+ }
+
+ /**
+ * Construct the Bloom filter with given bitset, it is used when reconstructing
+ * Bloom filter from parquet file.
+ *
+ * @param bitset The given bitset to construct Bloom filter.
+ * @param hashStrategy The hash strategy Bloom filter apply.
+ */
+ private BlockSplitBloomFilter(byte[] bitset, HashStrategy hashStrategy) {
+ if (bitset == null) {
+ throw new RuntimeException("Given bitset is null");
+ }
+
+ this.bitset = bitset;
+ this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
+ switch (hashStrategy) {
+ case MURMUR3_X64_128:
+ this.hashStrategy = hashStrategy;
+ hashFunction = Hashing.murmur3_128(DEFAULT_SEED);
+ break;
+ default:
+ throw new RuntimeException("Not supported hash strategy");
+ }
+ }
+
+ /**
+ * Create a new bitset for Bloom filter.
+ *
+ * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
+ * [MINIMUM_BYTES, MAXIMUM_BYTES], it will be rounded up/down
+ * to lower/upper bound if num_bytes is out of range and also will rounded up to a power
+ * of 2. It uses murmur3_x64_128 as its default hash function and block-based algorithm
+ * as default algorithm.
+ */
+ private void initBitset(int numBytes) {
+ if (numBytes < MINIMUM_BYTES) {
+ numBytes = MINIMUM_BYTES;
+ }
+ // Get next power of 2 if it is not power of 2.
+ if ((numBytes & (numBytes - 1)) != 0) {
+ numBytes = Integer.highestOneBit(numBytes) << 1;
+ }
+ if (numBytes > MAXIMUM_BYTES || numBytes < 0) {
+ numBytes = MAXIMUM_BYTES;
+ }
+ this.bitset = new byte[numBytes];
+ this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
+ }
+
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ // Write number of bytes of bitset.
+ out.write(BytesUtils.intToBytes(bitset.length));
+ // Write hash strategy
+ out.write(BytesUtils.intToBytes(hashStrategy.value));
+ // Write algorithm
+ out.write(BytesUtils.intToBytes(Algorithm.BLOCK.value));
+ // Write bitset
+ out.write(bitset);
+ }
+
+ private int[] setMask(int key) {
+ int mask[] = new int[BITS_SET_PER_BLOCK];
+
+ for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
+ mask[i] = key * SALT[i];
+ }
+ for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
+ mask[i] = mask[i] >>> 27;
+ }
+ for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
+ mask[i] = 0x1 << mask[i];
+ }
+
+ return mask;
+ }
+
+ @Override
+ public void insertHash(long hash) {
+ int bucketIndex = (int)(hash >> 32) & (bitset.length / BYTES_PER_BLOCK - 1);
+ int key = (int)hash;
+
+ // Calculate mask for bucket.
+ int mask[] = setMask(key);
+ for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
+ int value = intBuffer.get(bucketIndex * (BYTES_PER_BLOCK / 4) + i);
+ value |= mask[i];
+ intBuffer.put(bucketIndex * (BYTES_PER_BLOCK / 4) + i, value);
+ }
+ }
+
+ @Override
+ public boolean findHash(long hash) {
+ int bucketIndex = (int)(hash >> 32) & (bitset.length / BYTES_PER_BLOCK - 1);
+ int key = (int)hash;
+
+ // Calculate mask for the tiny Bloom filter.
+ int mask[] = setMask(key);
+ for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
+ if (0 == (intBuffer.get(bucketIndex * (BYTES_PER_BLOCK / 4) + i) & mask[i])) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Calculate optimal size according to the number of distinct values and false positive probability.
+ *
+ * @param n: The number of distinct values.
+ * @param p: The false positive probability.
+ * @return optimal number of bits of given n and p.
+ */
+ public static int optimalNumOfBits(long n, double p) {
+ Preconditions.checkArgument((p > 0.0 && p < 1.0),
+ "FPP should be less than 1.0 and great than 0.0");
+ final double m = -8 * n / Math.log(1 - Math.pow(p, 1.0 / 8));
+ final double MAX = MAXIMUM_BYTES << 3;
+ int numBits = (int)m;
+
+ // Handle overflow.
+ if (m > MAX || m < 0) {
+ numBits = (int)MAX;
+ }
+ // Get next power of 2 if bits is not power of 2.
+ if ((numBits & (numBits - 1)) != 0) {
+ numBits = Integer.highestOneBit(numBits) << 1;
+ }
+ if (numBits < (MINIMUM_BYTES << 3)) {
+ numBits = MINIMUM_BYTES << 3;
+ }
+
+ return numBits;
+ }
+
+ @Override
+ public long hash(int value) {
+ ByteBuffer plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
+ plain.order(ByteOrder.LITTLE_ENDIAN).putInt(value);
+ return hashFunction.hashBytes(plain.array()).asLong();
+ }
+
+ @Override
+ public long hash(long value) {
+ ByteBuffer plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE);
+ plain.order(ByteOrder.LITTLE_ENDIAN).putLong(value);
+ return hashFunction.hashBytes(plain.array()).asLong();
+ }
+
+ @Override
+ public long hash(double value) {
+ ByteBuffer plain = ByteBuffer.allocate(Double.SIZE/Byte.SIZE);
+ plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(value);
+ return hashFunction.hashBytes(plain.array()).asLong();
+ }
+
+ @Override
+ public long hash(float value) {
+ ByteBuffer plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE);
+ plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(value);
+ return hashFunction.hashBytes(plain.array()).asLong();
+ }
+
+ @Override
+ public long hash(Binary value) {
+ return hashFunction.hashBytes(value.getBytes()).asLong();
+ }
+
+ @Override
+ public long getBitsetSize() {
+ return this.bitset.length;
+ }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
new file mode 100644
index 0000000000..3ec192e3e0
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bloomfilter;
+
+import org.apache.parquet.io.api.Binary;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A Bloom filter is a compact structure to indicate whether an item is not in a set or probably
+ * in a set. The Bloom filter usually consists of a bit set that represents a elements set,
+ * a hash strategy and a Bloom filter algorithm.
+ */
+public interface BloomFilter {
+ // Bloom filter Hash strategy.
+ enum HashStrategy {
+ MURMUR3_X64_128(0);
+ HashStrategy(int value) {
+ this.value = value;
+ }
+ int value;
+ }
+
+ // Bloom filter algorithm.
+ enum Algorithm {
+ BLOCK(0);
+ Algorithm(int value) {
+ this.value = value;
+ }
+ int value;
+ }
+
+ /**
+ * Write the Bloom filter to an output stream. It writes the Bloom filter header including the
+ * bitset's length in bytes, the hash strategy, the algorithm, and the bitset.
+ *
+ * @param out the output stream to write
+ */
+ void writeTo(OutputStream out) throws IOException;
+
+ /**
+ * Insert an element to the Bloom filter, the element content is represented by
+ * the hash value of its plain encoding result.
+ *
+ * @param hash the hash result of element.
+ */
+ void insertHash(long hash);
+
+ /**
+ * Determine whether an element is in set or not.
+ *
+ * @param hash the hash value of element plain encoding result.
+ * @return false if element is must not in set, true if element probably in set.
+ */
+ boolean findHash(long hash);
+
+ /**
+ * Compute hash for int value by using its plain encoding result.
+ *
+ * @param value the value to hash
+ * @return hash result
+ */
+ long hash(int value);
+
+ /**
+ * Compute hash for long value by using its plain encoding result.
+ *
+ * @param value the value to hash
+ * @return hash result
+ */
+ long hash(long value) ;
+
+ /**
+ * Compute hash for double value by using its plain encoding result.
+ *
+ * @param value the value to hash
+ * @return hash result
+ */
+ long hash(double value);
+
+ /**
+ * Compute hash for float value by using its plain encoding result.
+ *
+ * @param value the value to hash
+ * @return hash result
+ */
+ long hash(float value);
+
+ /**
+ * Compute hash for Binary value by using its plain encoding result.
+ *
+ * @param value the value to hash
+ * @return hash result
+ */
+ long hash(Binary value);
+
+ /**
+ * Get the number of bytes for bitset in this Bloom filter.
+ *
+ * @return The number of bytes for bitset in this Bloom filter.
+ */
+ long getBitsetSize();
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReadStore.java
new file mode 100644
index 0000000000..3373bc1a0e
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReadStore.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bloomfilter;
+
+import org.apache.parquet.column.ColumnDescriptor;
+
+/**
+ * contains all the bloom filter reader for all columns of a row group
+ */
+public interface BloomFilterReadStore {
+ /**
+ * Get a Bloom filter reader of a column
+ *
+ * @param path the descriptor of the column
+ * @return the corresponding Bloom filter writer
+ */
+ BloomFilterReader getBloomFilterReader(ColumnDescriptor path);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReader.java
new file mode 100644
index 0000000000..7a430581dd
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReader.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import org.apache.parquet.column.ColumnDescriptor;
+
+public interface BloomFilterReader {
+ /**
+ * Returns a {@link BloomFilter} for the given column descriptor.
+ *
+ * @param path the descriptor of the column
+ * @return the bloomFilter dta for that column, or null if there isn't one
+ */
+ BloomFilter readBloomFilter(ColumnDescriptor path);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java
new file mode 100644
index 0000000000..f7e28fdf2d
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import org.apache.parquet.column.ColumnDescriptor;
+
+/**
+ * Contains all writers for all columns of a row group
+ */
+public interface BloomFilterWriteStore {
+ /**
+ * Get bloom filter writer of a column
+ *
+ * @param path the descriptor for the column
+ * @return the corresponding Bloom filter writer
+ */
+ BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
new file mode 100644
index 0000000000..0fab73b2a4
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
@@ -0,0 +1,33 @@
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+public interface BloomFilterWriter {
+ /**
+ * Write a Bloom filter
+ *
+ * @param bloomFilter the Bloom filter to write
+ *
+ */
+ void writeBloomFilter(BloomFilter bloomFilter);
+}
+
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
new file mode 100644
index 0000000000..8dbb0ba193
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bloomfilter;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.parquet.column.values.RandomStr;
+import org.apache.parquet.io.api.Binary;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestBlockSplitBloomFilter {
+ @Test
+ public void testConstructor () throws IOException {
+ BloomFilter bloomFilter1 = new BlockSplitBloomFilter(0);
+ assertEquals(bloomFilter1.getBitsetSize(), BlockSplitBloomFilter.MINIMUM_BYTES);
+ BloomFilter bloomFilter2 = new BlockSplitBloomFilter(BlockSplitBloomFilter.MAXIMUM_BYTES + 1);
+ assertEquals(bloomFilter2.getBitsetSize(), BlockSplitBloomFilter.MAXIMUM_BYTES);
+ BloomFilter bloomFilter3 = new BlockSplitBloomFilter(1000);
+ assertEquals(bloomFilter3.getBitsetSize(), 1024);
+ }
+
+ @Rule
+ public final TemporaryFolder temp = new TemporaryFolder();
+
+ /*
+ * This test is used to test basic operations including inserting, finding and
+ * serializing and de-serializing.
+ */
+ @Test
+ public void testBasic () throws IOException {
+ final String testStrings[] = {"hello", "parquet", "bloom", "filter"};
+ BloomFilter bloomFilter = new BlockSplitBloomFilter(1024);
+
+ for(int i = 0; i < testStrings.length; i++) {
+ bloomFilter.insertHash(bloomFilter.hash(Binary.fromString(testStrings[i])));
+ }
+
+ File testFile = temp.newFile();
+ FileOutputStream fileOutputStream = new FileOutputStream(testFile);
+ bloomFilter.writeTo(fileOutputStream);
+ fileOutputStream.close();
+ FileInputStream fileInputStream = new FileInputStream(testFile);
+
+ byte[] value = new byte[4];
+ fileInputStream.read(value);
+ int length = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+ assertEquals(length, 1024);
+
+ fileInputStream.read(value);
+ int hash = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+ assertEquals(hash, BloomFilter.HashStrategy.MURMUR3_X64_128.ordinal());
+
+ fileInputStream.read(value);
+ int algorithm = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+ assertEquals(algorithm, BloomFilter.Algorithm.BLOCK.ordinal());
+
+ byte[] bitset = new byte[length];
+ fileInputStream.read(bitset);
+ bloomFilter = new BlockSplitBloomFilter(bitset);
+ for(int i = 0; i < testStrings.length; i++) {
+ assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(testStrings[i]))));
+ }
+ }
+
+ @Test
+ public void testFPP() throws IOException {
+ final int totalCount = 100000;
+ final double FPP = 0.01;
+ final long SEED = 104729;
+
+ BloomFilter bloomFilter = new BlockSplitBloomFilter(BlockSplitBloomFilter.optimalNumOfBits(totalCount, FPP));
+ List strings = new ArrayList<>();
+ RandomStr randomStr = new RandomStr(new Random(SEED));
+ for(int i = 0; i < totalCount; i++) {
+ String str = randomStr.get(10);
+ strings.add(str);
+ bloomFilter.insertHash(bloomFilter.hash(Binary.fromString(str)));
+ }
+
+ // The exist counts the number of times FindHash returns true.
+ int exist = 0;
+ for (int i = 0; i < totalCount; i++) {
+ String str = randomStr.get(8);
+ if (bloomFilter.findHash(bloomFilter.hash(Binary.fromString(str)))) {
+ exist ++;
+ }
+ }
+
+ // The exist should be probably less than 1000 according FPP 0.01.
+ assertTrue(exist < totalCount * FPP);
+ }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index b9c8996f0f..2ede5c483e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -457,6 +457,7 @@ private void addRowGroup(ParquetMetadata parquetMetadata, List rowGrou
columnMetaData.getTotalSize(),
columnMetaData.getFirstDataPageOffset());
columnChunk.meta_data.dictionary_page_offset = columnMetaData.getDictionaryPageOffset();
+ columnChunk.meta_data.setBloom_filter_offset(columnMetaData.getBloomFilterOffset());
if (!columnMetaData.getStatistics().isEmpty()) {
columnChunk.meta_data.setStatistics(toParquetStatistics(columnMetaData.getStatistics()));
}
@@ -1185,6 +1186,7 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws
messageType.getType(path.toArray()).asPrimitiveType()),
metaData.data_page_offset,
metaData.dictionary_page_offset,
+ metaData.bloom_filter_offset,
metaData.num_values,
metaData.total_compressed_size,
metaData.total_uncompressed_size);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterDataReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterDataReader.java
new file mode 100644
index 0000000000..96e258fe40
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterDataReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.parquet.Strings;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.io.ParquetDecodingException;
+/**
+ * A {@link BloomFilterReader} implementation that reads Bloom filter data from
+ * an open {@link ParquetFileReader}.
+ *
+ */
+public class BloomFilterDataReader implements BloomFilterReader {
+ private final ParquetFileReader reader;
+ private final Map columns;
+ private final Map cache = new HashMap<>();
+ public BloomFilterDataReader(ParquetFileReader fileReader, BlockMetaData block) {
+ this.reader = fileReader;
+ this.columns = new HashMap<>();
+ for (ColumnChunkMetaData column : block.getColumns()) {
+ columns.put(column.getPath().toDotString(), column);
+ }
+ }
+ @Override
+ public BloomFilter readBloomFilter(ColumnDescriptor descriptor) {
+ String dotPath = Strings.join(descriptor.getPath(), ".");
+ ColumnChunkMetaData column = columns.get(dotPath);
+ if (column == null) {
+ throw new ParquetDecodingException(
+ "Cannot load Bloom filter data, unknown column: " + dotPath);
+ }
+ if (cache.containsKey(dotPath)) {
+ return cache.get(dotPath);
+ }
+ try {
+ synchronized (cache) {
+ if (!cache.containsKey(dotPath)) {
+ BloomFilter bloomFilter = reader.readBloomFilter(column);
+ if (bloomFilter == null) return null;
+ cache.put(dotPath, bloomFilter);
+ }
+ }
+ return cache.get(dotPath);
+ } catch (IOException e) {
+ throw new ParquetDecodingException(
+ "Failed to read Bloom data", e);
+ }
+ }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 01867c6797..7fe0e410ee 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -33,6 +33,8 @@
import java.io.IOException;
import java.io.SequenceInputStream;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -69,6 +71,8 @@
import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.RowGroupFilter;
@@ -1045,6 +1049,46 @@ private DictionaryPage readCompressedDictionary(
converter.getEncoding(dictHeader.getEncoding()));
}
+ public BloomFilterDataReader getBloomFilterDataReader(BlockMetaData block) {
+ return new BloomFilterDataReader(this, block);
+ }
+
+ /**
+ * Reads Bloom filter data for the given column chunk.
+ *
+ * @param meta a column's ColumnChunkMetaData to read the dictionary from
+ * @return an BloomFilter object.
+ * @throws IOException if there is an error while reading the Bloom filter.
+ */
+ public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException {
+ long bloomFilterOffset = meta.getBloomFilterOffset();
+ f.seek(bloomFilterOffset);
+
+ // Read Bloom filter data header.
+ byte[] bytes = new byte[BlockSplitBloomFilter.HEADER_SIZE];
+ f.read(bytes);
+ ByteBuffer bloomHeader = ByteBuffer.wrap(bytes);
+ IntBuffer headerBuffer = bloomHeader.order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
+ int numBytes = headerBuffer.get();
+ if (numBytes <= 0 || numBytes > BlockSplitBloomFilter.MAXIMUM_BYTES) {
+ return null;
+ }
+
+ BloomFilter.HashStrategy hash = BloomFilter.HashStrategy.values()[headerBuffer.get()];
+ if (hash != BlockSplitBloomFilter.HashStrategy.MURMUR3_X64_128) {
+ return null;
+ }
+
+ BloomFilter.Algorithm algorithm = BloomFilter.Algorithm.values()[headerBuffer.get()];
+ if (algorithm != BlockSplitBloomFilter.Algorithm.BLOCK) {
+ return null;
+ }
+
+ byte[] bitset = new byte[numBytes];
+ f.readFully(bitset);
+ return new BlockSplitBloomFilter(bitset);
+ }
+
/**
* @param column
* the column chunk which the column index is to be returned for
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 20efe47573..1fc2c1360e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -61,6 +61,7 @@
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.example.DummyRecordConverter;
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
import org.apache.parquet.hadoop.metadata.ColumnPath;
@@ -150,6 +151,7 @@ public static enum Mode {
private long currentChunkValueCount; // set in startColumn
private long currentChunkFirstDataPage; // set in startColumn (out.pos())
private long currentChunkDictionaryPageOffset; // set in writeDictionaryPage
+ private long currentChunkBloomFilterDataOffset; // set in writeBloomData
// set when end is called
private ParquetMetadata footer = null;
@@ -408,6 +410,16 @@ public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOExceptio
currentEncodings.add(dictionaryPage.getEncoding());
}
+ /**
+ * Write a Bloom filter
+ * @param bloomFilter the bloom filter of column values
+ * @throws IOException if there is an error while writing
+ */
+ public void writeBloomFilter(BloomFilter bloomFilter) throws IOException {
+ state = state.write();
+ currentChunkBloomFilterDataOffset = out.getPos();
+ bloomFilter.writeTo(out);
+ }
/**
* writes a single page
@@ -626,6 +638,7 @@ public void endColumn() throws IOException {
currentStatistics,
currentChunkFirstDataPage,
currentChunkDictionaryPageOffset,
+ currentChunkBloomFilterDataOffset,
currentChunkValueCount,
compressedLength,
uncompressedLength));
@@ -885,6 +898,7 @@ public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup,
chunk.getStatistics(),
newChunkStart,
newChunkStart,
+ chunk.getBloomFilterOffset(),
chunk.getValueCount(),
chunk.getTotalSize(),
chunk.getTotalUncompressedSize()));
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index b8fce2f65d..4d6f42c2b8 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -134,6 +134,12 @@ public class ParquetInputFormat extends FileInputFormat {
*/
public static final String COLUMN_INDEX_FILTERING_ENABLED = "parquet.filter.columnindex.enabled";
+ /**
+ * key to configure whether row group bloom filtering is enabled
+ */
+ public static final String BLOOM_FILTERING_ENABLED = "parquet.filter.bloom.enabled";
+ public static final boolean BLOOM_FILTER_ENABLED_DEFAULT = false;
+
/**
* key to turn on or off task side metadata loading (default true)
* if true then metadata is read on the task side and some tasks may finish immediately.
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 04cbd15c0b..33c3715378 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,7 @@
import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration;
import java.io.IOException;
+import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -144,6 +145,9 @@ public static enum JobSummaryLevel {
public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max";
public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
+ public static final String BLOOM_FILTER_COLUMN_NAMES = "parquet.bloom.filter.column.names";
+ public static final String BLOOM_FILTER_EXPECTED_NDV = "parquet.bloom.filter.expected.ndv";
+ public static final String ENABLE_BLOOM_FILTER = "parquet.enable.bloom.filter";
public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
@@ -210,6 +214,34 @@ public static boolean getEnableDictionary(JobContext jobContext) {
return getEnableDictionary(getConfiguration(jobContext));
}
+ public static HashMap getBloomFilterColumnExpectedNDVs(Configuration conf) {
+ HashMap kv = new HashMap<>();
+ String columnNamesConf = conf.get(BLOOM_FILTER_COLUMN_NAMES);
+ String expectedNDVsConf = conf.get(BLOOM_FILTER_EXPECTED_NDV);
+
+ if (columnNamesConf == null || expectedNDVsConf == null) {
+ return kv;
+ }
+
+ String[] columnNames = columnNamesConf.split(",");
+ String[] expectedNDVs = expectedNDVsConf.split(",");
+
+ if (columnNames.length == expectedNDVs.length) {
+ for (int i = 0; i < columnNames.length; i++) {
+ kv.put(columnNames[i], Long.getLong(expectedNDVs[i]));
+ }
+ } else {
+ LOG.warn("Bloom filter column names are not match expected NDVs");
+ }
+
+ return kv;
+ }
+
+ public static boolean getEnableBloomFilter(Configuration configuration) {
+ return configuration.getBoolean(ENABLE_BLOOM_FILTER,
+ ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED);
+ }
+
public static int getBlockSize(JobContext jobContext) {
return getBlockSize(getConfiguration(jobContext));
}
@@ -388,6 +420,8 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp
.withPageSize(getPageSize(conf))
.withDictionaryPageSize(getDictionaryPageSize(conf))
.withDictionaryEncoding(getEnableDictionary(conf))
+ .withBloomFilterEnabled(getEnableBloomFilter(conf))
+ .withBloomFilterInfo(getBloomFilterColumnExpectedNDVs(conf))
.withWriterVersion(getWriterVersion(conf))
.estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf))
.withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
@@ -412,6 +446,10 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp
LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck());
LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
+ LOG.info("Bloom Filter is {}", props.isBloomFilterEnabled()? "on": "off");
+ LOG.info("Bloom filter enabled column names are: {}", props.getBloomFilterColumnExpectedNDVs().keySet());
+ LOG.info("Bloom filter enabled column expected number of distinct values are: {}",
+ props.getBloomFilterColumnExpectedNDVs().values());
LOG.info("Page row count limit to {}", props.getPageRowCountLimit());
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index e6aa1043b4..3156132534 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -126,6 +126,7 @@ && positiveLongFitsInAnInt(totalUncompressedSize)) {
statistics,
firstDataPage,
dictionaryPageOffset,
+ 0,
valueCount,
totalSize,
totalUncompressedSize);
@@ -136,12 +137,56 @@ && positiveLongFitsInAnInt(totalUncompressedSize)) {
statistics,
firstDataPage,
dictionaryPageOffset,
+ 0,
valueCount,
totalSize,
totalUncompressedSize);
}
}
+ public static ColumnChunkMetaData get(
+ ColumnPath path,
+ PrimitiveType type,
+ CompressionCodecName codec,
+ EncodingStats encodingStats,
+ Set encodings,
+ Statistics statistics,
+ long firstDataPage,
+ long dictionaryPageOffset,
+ long bloomFilterDataOffset,
+ long valueCount,
+ long totalSize,
+ long totalUncompressedSize) {
+ // to save space we store those always positive longs in ints when they fit.
+ if (positiveLongFitsInAnInt(firstDataPage)
+ && positiveLongFitsInAnInt(dictionaryPageOffset)
+ && positiveLongFitsInAnInt(valueCount)
+ && positiveLongFitsInAnInt(totalSize)
+ && positiveLongFitsInAnInt(totalUncompressedSize)) {
+ return new IntColumnChunkMetaData(
+ path, type, codec,
+ encodingStats, encodings,
+ statistics,
+ firstDataPage,
+ dictionaryPageOffset,
+ bloomFilterDataOffset,
+ valueCount,
+ totalSize,
+ totalUncompressedSize);
+ } else {
+ return new LongColumnChunkMetaData(
+ path, type, codec,
+ encodingStats, encodings,
+ statistics,
+ firstDataPage,
+ dictionaryPageOffset,
+ bloomFilterDataOffset,
+ valueCount,
+ totalSize,
+ totalUncompressedSize);
+ }
+ }
+
/**
* @return the offset of the first byte in the chunk
*/
@@ -220,6 +265,11 @@ public PrimitiveType getPrimitiveType() {
*/
abstract public long getDictionaryPageOffset();
+ /**
+ * @return the location of the bloomFilter filter data if any
+ */
+ abstract public long getBloomFilterOffset();
+
/**
* @return count of values in this block of the column
*/
@@ -295,6 +345,7 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData {
private final int firstDataPage;
private final int dictionaryPageOffset;
+ private final int bloomFilterDataOffset;
private final int valueCount;
private final int totalSize;
private final int totalUncompressedSize;
@@ -321,12 +372,14 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData {
Statistics statistics,
long firstDataPage,
long dictionaryPageOffset,
+ long bloomFilterDataOffset,
long valueCount,
long totalSize,
long totalUncompressedSize) {
super(encodingStats, ColumnChunkProperties.get(path, type, codec, encodings));
this.firstDataPage = positiveLongToInt(firstDataPage);
this.dictionaryPageOffset = positiveLongToInt(dictionaryPageOffset);
+ this.bloomFilterDataOffset = positiveLongToInt(bloomFilterDataOffset);
this.valueCount = positiveLongToInt(valueCount);
this.totalSize = positiveLongToInt(totalSize);
this.totalUncompressedSize = positiveLongToInt(totalUncompressedSize);
@@ -368,6 +421,13 @@ public long getDictionaryPageOffset() {
return intToPositiveLong(dictionaryPageOffset);
}
+ /**
+ * @return the location of bloom filter if any
+ */
+ public long getBloomFilterOffset() {
+ return intToPositiveLong(bloomFilterDataOffset);
+ }
+
/**
* @return count of values in this block of the column
*/
@@ -400,6 +460,7 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData {
private final long firstDataPageOffset;
private final long dictionaryPageOffset;
+ private final long bloomFilterDataOffset;
private final long valueCount;
private final long totalSize;
private final long totalUncompressedSize;
@@ -426,12 +487,14 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData {
Statistics statistics,
long firstDataPageOffset,
long dictionaryPageOffset,
+ long bloomFilterDataOffset,
long valueCount,
long totalSize,
long totalUncompressedSize) {
super(encodingStats, ColumnChunkProperties.get(path, type, codec, encodings));
this.firstDataPageOffset = firstDataPageOffset;
this.dictionaryPageOffset = dictionaryPageOffset;
+ this.bloomFilterDataOffset = bloomFilterDataOffset;
this.valueCount = valueCount;
this.totalSize = totalSize;
this.totalUncompressedSize = totalUncompressedSize;
@@ -452,6 +515,13 @@ public long getDictionaryPageOffset() {
return dictionaryPageOffset;
}
+ /**
+ * @return the location of the bloom filter if any
+ */
+ public long getBloomFilterOffset() {
+ return bloomFilterDataOffset;
+ }
+
/**
* @return count of values in this block of the column
*/
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 917ad57910..0cfb001d49 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,6 +27,9 @@
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Version;
import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterReader;
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
import org.junit.Assume;
import org.junit.Rule;
@@ -219,6 +222,42 @@ public void testWriteRead() throws Exception {
PrintFooter.main(new String[] {path.toString()});
}
+ @Test
+ public void testBloomWriteRead() throws Exception {
+ MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }");
+ File testFile = temp.newFile();
+ testFile.delete();
+ Path path = new Path(testFile.toURI());
+ Configuration configuration = new Configuration();
+ configuration.set("parquet.bloomFilter.filter.column.names", "foo");
+ String colPath[] = {"foo"};
+ ColumnDescriptor col = schema.getColumnDescription(colPath);
+ BinaryStatistics stats1 = new BinaryStatistics();
+ ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
+ w.start();
+ w.startBlock(3);
+ w.startColumn(col, 5, CODEC);
+ w.writeDataPage(2, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN);
+ w.writeDataPage(3, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN);
+ BloomFilter bloomData = new BlockSplitBloomFilter(0);
+ bloomData.insertHash(bloomData.hash(Binary.fromString("hello")));
+ bloomData.insertHash(bloomData.hash(Binary.fromString("world")));
+ long blStarts = w.getPos();
+ w.writeBloomFilter(bloomData);
+ w.endColumn();
+ w.endBlock();
+ w.end(new HashMap());
+ ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
+ assertEquals("bloomFilter offset",
+ blStarts, readFooter.getBlocks().get(0).getColumns().get(0).getBloomFilterOffset());
+ ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
+ Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath)));
+ BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0));
+ BloomFilter bloomDataRead = bloomFilterReader.readBloomFilter(col);
+ assertTrue(bloomDataRead.findHash(bloomData.hash(Binary.fromString("hello"))));
+ assertTrue(bloomDataRead.findHash(bloomData.hash(Binary.fromString("world"))));
+ }
+
@Test
public void testAlignmentWithPadding() throws Exception {
File testFile = temp.newFile();
diff --git a/pom.xml b/pom.xml
index 9324a1a4bc..1189afb5a4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,7 +81,7 @@
1.2.1
2.7.1
3.1.2
- 2.6.0
+ 2.7.0-SNAPSHOT
1.7.0
thrift
thrift