Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion parquet-column/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@
<artifactId>fastutil</artifactId>
<version>${fastutil.version}</version>
</dependency>

<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>junit-benchmarks</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -29,14 +29,17 @@
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.factory.DefaultValuesWriterFactory;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
import org.apache.parquet.column.values.factory.ValuesWriterFactory;
import org.apache.parquet.schema.MessageType;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* This class represents all the configurable Parquet properties.
Expand All @@ -52,6 +55,7 @@ public class ParquetProperties {
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024;

public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;

Expand Down Expand Up @@ -94,13 +98,16 @@ public static WriterVersion fromString(String name) {

// The key-value pair represents the column name and its expected distinct number of values in a row group.
private final Map<String, Long> bloomFilterExpectedDistinctNumbers;
private final int maxBloomFilterBytes;
private final Set<String> bloomFilterColumns;
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;

private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit,
boolean pageWriteChecksumEnabled, Map<String, Long> bloomFilterExpectedDistinctNumber) {
boolean pageWriteChecksumEnabled, Map<String, Long> bloomFilterExpectedDistinctNumber,
Set<String> bloomFilterColumns, int maxBloomFilterBytes) {
this.pageSizeThreshold = pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
Expand All @@ -115,6 +122,8 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag
this.valuesWriterFactory = writerFactory;
this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
this.bloomFilterExpectedDistinctNumbers = bloomFilterExpectedDistinctNumber;
this.bloomFilterColumns = bloomFilterColumns;
this.maxBloomFilterBytes = maxBloomFilterBytes;
this.pageRowCountLimit = pageRowCountLimit;
this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
}
Expand Down Expand Up @@ -178,12 +187,13 @@ public ByteBufferAllocator getAllocator() {
}

public ColumnWriteStore newColumnWriteStore(MessageType schema,
PageWriteStore pageStore) {
PageWriteStore pageStore,
BloomFilterWriteStore bloomFilterWriteStore) {
switch (writerVersion) {
case PARQUET_1_0:
return new ColumnWriteStoreV1(schema, pageStore, this);
return new ColumnWriteStoreV1(schema, pageStore, bloomFilterWriteStore, this);
case PARQUET_2_0:
return new ColumnWriteStoreV2(schema, pageStore, this);
return new ColumnWriteStoreV2(schema, pageStore, bloomFilterWriteStore, this);
default:
throw new IllegalArgumentException("unknown version " + writerVersion);
}
Expand Down Expand Up @@ -221,6 +231,14 @@ public Map<String, Long> getBloomFilterColumnExpectedNDVs() {
return bloomFilterExpectedDistinctNumbers;
}

public Set<String> getBloomFilterColumns() {
return bloomFilterColumns;
}

public int getMaxBloomFilterBytes() {
return maxBloomFilterBytes;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -241,6 +259,8 @@ public static class Builder {
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
private Map<String, Long> bloomFilterColumnExpectedNDVs = new HashMap<>();
private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES;
private Set<String> bloomFilterColumns = new HashSet<>();
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;

Expand All @@ -260,6 +280,8 @@ private Builder(ParquetProperties toCopy) {
this.pageRowCountLimit = toCopy.pageRowCountLimit;
this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
this.bloomFilterColumnExpectedNDVs = toCopy.bloomFilterExpectedDistinctNumbers;
this.bloomFilterColumns = toCopy.bloomFilterColumns;
this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
}

/**
Expand Down Expand Up @@ -349,12 +371,34 @@ public Builder withColumnIndexTruncateLength(int length) {
}

/**
* Set Bloom filter info for columns.
* Set max Bloom filter bytes for related columns.
*
* @param maxBloomFilterBytes the max bytes of a Bloom filter bitset for a column.
* @return this builder for method chaining
*/
public Builder withMaxBloomFilterBytes(int maxBloomFilterBytes) {
this.maxBloomFilterBytes = maxBloomFilterBytes;
return this;
}

/**
* Set Bloom filter column names.
*
* @param columns the columns which has bloom filter enabled.
* @return this builder for method chaining
*/
public Builder withBloomFilterColumnNames(Set<String> columns) {
this.bloomFilterColumns = columns;
return this;
}

/**
* Set expected columns distinct number for Bloom filter.
*
* @param columnExpectedNDVs the columns expected number of distinct values in a row group
* @return this builder for method chaining
*/
public Builder withBloomFilterInfo(Map<String, Long> columnExpectedNDVs) {
public Builder withBloomFilterColumnNdvs(Map<String, Long> columnExpectedNDVs) {
this.bloomFilterColumnExpectedNDVs = columnExpectedNDVs;
return this;
}
Expand All @@ -375,7 +419,8 @@ public ParquetProperties build() {
new ParquetProperties(writerVersion, pageSize, dictPageSize,
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength,
pageRowCountLimit, pageWriteChecksumEnabled, bloomFilterColumnExpectedNDVs);
pageRowCountLimit, pageWriteChecksumEnabled, bloomFilterColumnExpectedNDVs,
bloomFilterColumns, maxBloomFilterBytes);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
// we'd like to decouple that and won't need to pass an object to properties and then pass the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.schema.MessageType;

Expand All @@ -31,6 +32,12 @@ public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore, Par
super(schema, pageWriteStore, props);
}

public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore,
BloomFilterWriteStore bloomFilterWriteStore,
ParquetProperties props) {
super(schema, pageWriteStore, bloomFilterWriteStore, props);
}

@Override
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.Set;

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriter;
Expand Down Expand Up @@ -85,14 +86,27 @@ abstract class ColumnWriterBase implements ColumnWriter {
if (path.getPath().length != 1 || bloomFilterWriter == null) {
return;
}
String column = path.getPath()[0];

this.bloomFilterWriter = bloomFilterWriter;
Set<String> bloomFilterColumns = props.getBloomFilterColumns();
if (!bloomFilterColumns.contains(column)) {
return;
}
int maxBloomFilterSize = props.getMaxBloomFilterBytes();

Map<String, Long> bloomFilterColumnExpectedNDVs = props.getBloomFilterColumnExpectedNDVs();
String column = path.getPath()[0];
if (bloomFilterColumnExpectedNDVs.keySet().contains(column)) {
int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits(bloomFilterColumnExpectedNDVs.get(column).intValue(),
BlockSplitBloomFilter.DEFAULT_FPP);
this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits/8);
if (bloomFilterColumnExpectedNDVs.size() > 0) {
// If user specify the column NDV, we construct Bloom filter from it.
if (bloomFilterColumnExpectedNDVs.keySet().contains(column)) {
int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits(bloomFilterColumnExpectedNDVs.get(column).intValue(),
BlockSplitBloomFilter.DEFAULT_FPP);

this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8, maxBloomFilterSize);
}
}
else {
this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down
Loading