Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.parquet.cli.util;

import com.google.common.base.Objects;
import com.google.common.base.MoreObjects;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep changes like this in a different patch. Every patch should have a single purpose.

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -385,7 +385,7 @@ public int hashCode() {

@Override
public String toString() {
return Objects.toStringHelper(this)
return MoreObjects.toStringHelper(this)
.add("type", type)
.add("value", value)
.add("children", children)
Expand Down
1 change: 0 additions & 1 deletion parquet-column/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,23 @@
*/
package org.apache.parquet.column;

import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;

import java.util.HashMap;

import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
import org.apache.parquet.bytes.HeapByteBufferAllocator;

import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
import org.apache.parquet.column.impl.ColumnWriteStoreV2;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
import org.apache.parquet.column.values.factory.DefaultValuesWriterFactory;
import org.apache.parquet.column.values.factory.ValuesWriterFactory;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
import org.apache.parquet.column.values.factory.ValuesWriterFactory;
import org.apache.parquet.schema.MessageType;

/**
Expand All @@ -47,6 +49,7 @@ public class ParquetProperties {
public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true;
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
public static final boolean DEFAULT_BLOOM_FILTER_ENABLED = false;

public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();

Expand Down Expand Up @@ -83,10 +86,12 @@ public static WriterVersion fromString(String name) {
private final boolean estimateNextSizeCheck;
private final ByteBufferAllocator allocator;
private final ValuesWriterFactory valuesWriterFactory;
private final boolean enableBloomFilter;
private final HashMap<String, Long> bloomFilterInfo;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please be more specific: what info?


private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
ValuesWriterFactory writerFactory) {
ValuesWriterFactory writerFactory, boolean enableBloomFilter, HashMap<String, Long> bloomFilterInfo) {
this.pageSizeThreshold = pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
Expand All @@ -97,7 +102,8 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag
this.maxRowCountForPageSizeCheck = maxRowCountForPageSizeCheck;
this.estimateNextSizeCheck = estimateNextSizeCheck;
this.allocator = allocator;

this.enableBloomFilter = enableBloomFilter;
this.bloomFilterInfo = bloomFilterInfo;
this.valuesWriterFactory = writerFactory;
}

Expand Down Expand Up @@ -159,6 +165,14 @@ public ByteBufferAllocator getAllocator() {
return allocator;
}

public boolean isBloomFilterEnabled() {
return enableBloomFilter;
}

public HashMap<String, Long> getBloomFilterInfo() {
return bloomFilterInfo;
}

public ColumnWriteStore newColumnWriteStore(MessageType schema,
PageWriteStore pageStore) {
switch (writerVersion) {
Expand Down Expand Up @@ -199,6 +213,8 @@ public static class Builder {
private int pageSize = DEFAULT_PAGE_SIZE;
private int dictPageSize = DEFAULT_DICTIONARY_PAGE_SIZE;
private boolean enableDict = DEFAULT_IS_DICTIONARY_ENABLED;
private boolean enableBloomFilter = DEFAULT_BLOOM_FILTER_ENABLED;
private HashMap<String, Long> bloomFilterInfo = new HashMap<>();
private WriterVersion writerVersion = DEFAULT_WRITER_VERSION;
private int minRowCountForPageSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
private int maxRowCountForPageSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
Expand All @@ -217,6 +233,8 @@ private Builder(ParquetProperties toCopy) {
this.maxRowCountForPageSizeCheck = toCopy.maxRowCountForPageSizeCheck;
this.estimateNextSizeCheck = toCopy.estimateNextSizeCheck;
this.allocator = toCopy.allocator;
this.enableBloomFilter = toCopy.enableBloomFilter;
this.bloomFilterInfo = toCopy.bloomFilterInfo;
}

/**
Expand Down Expand Up @@ -256,6 +274,38 @@ public Builder withDictionaryPageSize(int dictionaryPageSize) {
return this;
}

/**
* Set to enable Bloom filter.
*
* @param enableBloomFilter a boolean to indicate whether to enable Bloom filter.
* @return this builder for method chaining.
*/
public Builder withBloomFilterEnabled(boolean enableBloomFilter) {
this.enableBloomFilter = enableBloomFilter;
return this;
}

/**
* Set Bloom filter info for columns.
*
* @param names the columns to be enable for Bloom filter

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "enabled"

* @param sizes the sizes corresponding to columns

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you measure "size"? Do you mean the number of distinct values?

* @return this builder for method chaining
*/
public Builder withBloomFilterInfo(String names, String sizes) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not List<Column> where class Column { String name; long countDistinct; } or maybe List<String> names, List<Long> sizes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to use List here, we have to parse the string to List early. It needs a copy from string array to List.

String[] bloomFilterColumns = names.split(",");
String[] bloomFilterSizes = sizes.split(",");

Preconditions.checkArgument(bloomFilterColumns.length == bloomFilterSizes.length,
"Column names are not matched to sizes");

for (int i = 0; i < bloomFilterColumns.length; i++) {
bloomFilterInfo.put(bloomFilterColumns[i], Long.getLong(bloomFilterSizes[i]));
}

return this;
}

/**
* Set the {@link WriterVersion format version}.
*
Expand Down Expand Up @@ -303,7 +353,8 @@ public ParquetProperties build() {
ParquetProperties properties =
new ParquetProperties(writerVersion, pageSize, dictPageSize,
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
estimateNextSizeCheck, allocator, valuesWriterFactory);
estimateNextSizeCheck, allocator, valuesWriterFactory,
enableBloomFilter, bloomFilterInfo);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
// we'd like to decouple that and won't need to pass an object to properties and then pass the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,35 @@
import java.util.Set;
import java.util.TreeMap;

import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ColumnWriter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;

public class ColumnWriteStoreV1 implements ColumnWriteStore {

private final Map<ColumnDescriptor, ColumnWriterV1> columns = new TreeMap<ColumnDescriptor, ColumnWriterV1>();
private final PageWriteStore pageWriteStore;
private final ParquetProperties props;
private BloomFilterWriteStore bloomFilterWriteStore;

public ColumnWriteStoreV1(PageWriteStore pageWriteStore,
ParquetProperties props) {
this.pageWriteStore = pageWriteStore;
this.props = props;
}

public ColumnWriteStoreV1(PageWriteStore pageWriteStore,
BloomFilterWriteStore bloomFilterWriteStore,
ParquetProperties props) {
this (pageWriteStore, props);
this.bloomFilterWriteStore = bloomFilterWriteStore;
}

public ColumnWriter getColumnWriter(ColumnDescriptor path) {
ColumnWriterV1 column = columns.get(path);
if (column == null) {
Expand All @@ -61,7 +69,13 @@ public Set<ColumnDescriptor> getColumnDescriptors() {

private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
return new ColumnWriterV1(path, pageWriter, props);

if (props.isBloomFilterEnabled() && props.getBloomFilterInfo() != null) {
BloomFilterWriter bloomFilterWriter = bloomFilterWriteStore.getBloomFilterWriter(path);
return new ColumnWriterV1(path, pageWriter, bloomFilterWriter, props);
} else {
return new ColumnWriterV1(path, pageWriter, props);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.schema.MessageType;

public class ColumnWriteStoreV2 implements ColumnWriteStore {
Expand Down Expand Up @@ -66,6 +68,30 @@ public ColumnWriteStoreV2(
this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
}

public ColumnWriteStoreV2(
MessageType schema,
PageWriteStore pageWriteStore,
BloomFilterWriteStore bloomFilterWriteStore,
ParquetProperties props) {
this.props = props;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please just check props.isBloomFilterEnabled() here and delegate to the other constructor if it is not true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In java the constructor must be call at first in another constructor.

this.thresholdTolerance = (long)(props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);
Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();

for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
if (props.isBloomFilterEnabled() && props.getBloomFilterInfo() != null) {
BloomFilterWriter bloomFilterWriter = bloomFilterWriteStore.getBloomFilterWriter(path);
mcolumns.put(path, new ColumnWriterV2(path, pageWriter, bloomFilterWriter, props));
} else {
mcolumns.put(path, new ColumnWriterV2(path, pageWriter, props));
}
}
this.columns = unmodifiableMap(mcolumns);
this.writers = this.columns.values();

this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
}

public ColumnWriter getColumnWriter(ColumnDescriptor path) {
return columns.get(path);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.parquet.bytes.BytesInput.concat;

import java.io.IOException;
import java.util.HashMap;

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriter;
Expand All @@ -29,6 +30,8 @@
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.io.api.Binary;
import org.slf4j.Logger;
Expand All @@ -55,6 +58,23 @@ final class ColumnWriterV1 implements ColumnWriter {
private int valueCountForNextSizeCheck;

private Statistics statistics;
private BloomFilterWriter bloomFilterWriter;
private BloomFilter bloomFilter;

public ColumnWriterV1(ColumnDescriptor path, PageWriter pageWriter,
BloomFilterWriter bloomFilterWriter, ParquetProperties props) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should you check that bloomFilterWriter is not null?

this(path, pageWriter, props);

// Current not support nested column.
if (path.getPath().length == 1) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can remove a level of indentation here via if (path.getPath().length != 1) return;

this.bloomFilterWriter = bloomFilterWriter;
HashMap<String, Long> bloomFilterInfo = props.getBloomFilterInfo();
String column = path.getPath()[0];
if (bloomFilterInfo.keySet().contains(column)) {
this.bloomFilter = new BloomFilter(bloomFilterInfo.get(column).intValue());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do you want longValue()?

}
}
}

public ColumnWriterV1(ColumnDescriptor path, PageWriter pageWriter,
ParquetProperties props) {
Expand Down Expand Up @@ -177,6 +197,9 @@ public void write(double value, int repetitionLevel, int definitionLevel) {
definitionLevelColumn.writeInteger(definitionLevel);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you aren't going to write data for booleans or nulls, please note that in your parquet-format patch, apache/parquet-format#112

dataColumn.writeDouble(value);
updateStatistics(value);
if (bloomFilter != null) {
bloomFilter.insert(bloomFilter.hash(value));
}
accountForValueWritten();
}

Expand All @@ -187,6 +210,9 @@ public void write(float value, int repetitionLevel, int definitionLevel) {
definitionLevelColumn.writeInteger(definitionLevel);
dataColumn.writeFloat(value);
updateStatistics(value);
if (bloomFilter != null) {
bloomFilter.insert(bloomFilter.hash(value));
}
accountForValueWritten();
}

Expand All @@ -197,6 +223,9 @@ public void write(Binary value, int repetitionLevel, int definitionLevel) {
definitionLevelColumn.writeInteger(definitionLevel);
dataColumn.writeBytes(value);
updateStatistics(value);
if (bloomFilter != null) {
bloomFilter.insert(bloomFilter.hash(value));
}
accountForValueWritten();
}

Expand All @@ -217,6 +246,9 @@ public void write(int value, int repetitionLevel, int definitionLevel) {
definitionLevelColumn.writeInteger(definitionLevel);
dataColumn.writeInteger(value);
updateStatistics(value);
if (bloomFilter != null) {
bloomFilter.insert(bloomFilter.hash(value));
}
accountForValueWritten();
}

Expand All @@ -227,6 +259,9 @@ public void write(long value, int repetitionLevel, int definitionLevel) {
definitionLevelColumn.writeInteger(definitionLevel);
dataColumn.writeLong(value);
updateStatistics(value);
if (bloomFilter != null) {
bloomFilter.insert(bloomFilter.hash(value));
}
accountForValueWritten();
}

Expand All @@ -244,6 +279,10 @@ public void flush() {
}
dataColumn.resetDictionary();
}

if (bloomFilterWriter != null && bloomFilter != null) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should be done if bloomFilter is not null but bFW is null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing need to be done in that case in my opinion, the bloom filter information should be lost and it won't impact a lot.

bloomFilterWriter.writeBloomFilter(bloomFilter);
}
}

@Override
Expand All @@ -257,17 +296,21 @@ public void close() {

@Override
public long getBufferedSizeInMemory() {
long bloomBufferSize = bloomFilter == null ? 0 : bloomFilter.getBufferedSize();
return repetitionLevelColumn.getBufferedSize()
+ definitionLevelColumn.getBufferedSize()
+ dataColumn.getBufferedSize()
+ pageWriter.getMemSize();
+ pageWriter.getMemSize()
+ bloomBufferSize;
}

public long allocatedSize() {
long bloomAllocatedSize = bloomFilter == null ? 0 : bloomFilter.getBufferedSize();
return repetitionLevelColumn.getAllocatedSize()
+ definitionLevelColumn.getAllocatedSize()
+ dataColumn.getAllocatedSize()
+ pageWriter.allocatedSize();
+ definitionLevelColumn.getAllocatedSize()
+ dataColumn.getAllocatedSize()
+ pageWriter.allocatedSize()
+ bloomAllocatedSize;
}

public String memUsageString(String indent) {
Expand Down
Loading