Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions api/src/main/java/org/apache/iceberg/data/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,22 @@ default Record copy(String field1, Object value1, String field2, Object value2,
return copy(overwriteValues);
}

default Record copy(String field1, Object value1, String field2, Object value2, String field3, Object value3,
String field4, Object value4, String field5, Object value5, String field6, Object value6,
String field7, Object value7, String field8, Object value8, String field9, Object value9,
String field10, Object value10, String field11, Object value11) {
Map<String, Object> overwriteValues = Maps.newHashMapWithExpectedSize(11);
overwriteValues.put(field1, value1);
overwriteValues.put(field2, value2);
overwriteValues.put(field3, value3);
overwriteValues.put(field4, value4);
overwriteValues.put(field5, value5);
overwriteValues.put(field6, value6);
overwriteValues.put(field7, value7);
overwriteValues.put(field8, value8);
overwriteValues.put(field9, value9);
overwriteValues.put(field10, value10);
overwriteValues.put(field11, value11);
return copy(overwriteValues);
}
}
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ private TableProperties() {
"write.delete.parquet.row-group-check-max-record-count";
public static final int PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT = 10000;

public static final String DEFAULT_PARQUET_BLOOM_FILTER_ENABLED = "write.parquet.bloom-filter-enabled.default";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it ever make sense to enable bloom filter for all columns? should we only allow bloom filter for explicitly specified columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not a common usage to enable bloom filter for all columns, but it's legal. This is consistent with the parquet-mr bloom filter implementations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Steven. I think it only makes sense to enable bloom filters for some columns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. While it's consistent with the parquet-mr bloom filter implementaiton, we need to think of user experience first and foremost.

It doesn't make sense to enable bloom filters for a lot of columns. And many users don't do any tuning of their metadata / statistics.

I think it's in-line with other things we do to make the users experience better, like turning off column level statistics after a certain number of columns. We can point it out in the docs under a big !!!NOTE (that's highlighted) that bloom filter is only used when turned on.

It's really an advanced thing to use at all imo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I have removed DEFAULT_PARQUET_BLOOM_FILTER_ENABLED. Now user needs to enable bloom filter for individual column using PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX. If the column is a complex type, user needs to enable the column inside the complex type, for example,
set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "struct_col.int_field", "true")

public static final boolean DEFAULT_PARQUET_BLOOM_FILTER_ENABLED_DEFAULT = false;

public static final String PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column.";
public static final String PARQUET_BLOOM_FILTER_COLUMN_EXPECTED_NDV_PREFIX =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to document that the NDV is specific for a parquet file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank for your comment. I documented this in configuration.md.

"write.parquet.bloom-filter-expected-ndv.column.";

public static final String PARQUET_BLOOM_FILTER_MAX_BYTES = "write.parquet.bloom-filter-max-bytes";
public static final int PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT = 1024 * 1024;

public static final String AVRO_COMPRESSION = "write.avro.compression-codec";
public static final String DELETE_AVRO_COMPRESSION = "write.delete.avro.compression-codec";
public static final String AVRO_COMPRESSION_DEFAULT = "gzip";
Expand Down
13 changes: 13 additions & 0 deletions data/src/test/java/org/apache/iceberg/data/FileHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,14 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;

import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.DEFAULT_PARQUET_BLOOM_FILTER_ENABLED;
import static org.apache.iceberg.TableProperties.DEFAULT_PARQUET_BLOOM_FILTER_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT;

public class FileHelpers {
private FileHelpers() {
Expand Down Expand Up @@ -115,6 +120,14 @@ public static DataFile writeDataFile(Table table, OutputFile out, StructLike par
throws IOException {
FileFormat format = defaultFormat(table.properties());
GenericAppenderFactory factory = new GenericAppenderFactory(table.schema(), table.spec());
boolean useBloomFilter = PropertyUtil.propertyAsBoolean(table.properties(),
DEFAULT_PARQUET_BLOOM_FILTER_ENABLED,
DEFAULT_PARQUET_BLOOM_FILTER_ENABLED_DEFAULT);
int blockSize = PropertyUtil.propertyAsInt(table.properties(),
PARQUET_ROW_GROUP_SIZE_BYTES,
PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT);
factory.set(DEFAULT_PARQUET_BLOOM_FILTER_ENABLED, Boolean.toString(useBloomFilter));
factory.set(PARQUET_ROW_GROUP_SIZE_BYTES, Integer.toString(blockSize));

FileAppender<Record> writer = factory.newAppender(out, format);
try (Closeable toClose = writer) {
Expand Down
123 changes: 116 additions & 7 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,19 @@
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;

import static org.apache.iceberg.TableProperties.DEFAULT_PARQUET_BLOOM_FILTER_ENABLED;
import static org.apache.iceberg.TableProperties.DEFAULT_PARQUET_BLOOM_FILTER_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_DICT_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_EXPECTED_NDV_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
Expand Down Expand Up @@ -239,6 +245,10 @@ public <D> FileAppender<D> build() throws IOException {
CompressionCodecName codec = context.codec();
int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount();
int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount();
boolean bloomFilterEnabled = context.bloomFilterEnabled();
int bloomFilterMaxBytes = context.bloomFilterMaxBytes();
Map<String, String> columnBloomFilterEnabled = context.columnBloomFilterEnabled();
Map<String, String> columnBloomFilterNDVs = context.columnBloomFilterNDVs();

if (compressionLevel != null) {
switch (codec) {
Expand Down Expand Up @@ -269,19 +279,34 @@ public <D> FileAppender<D> build() throws IOException {
conf.set(entry.getKey(), entry.getValue());
}

ParquetProperties parquetProperties = ParquetProperties.builder()
ParquetProperties.Builder propsBuilder = ParquetProperties.builder()
.withWriterVersion(writerVersion)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
.withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount)
.withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount)
.build();
.withMaxBloomFilterBytes(bloomFilterMaxBytes)
.withBloomFilterEnabled(bloomFilterEnabled);

for (Map.Entry<String, String> entry : columnBloomFilterEnabled.entrySet()) {
String colPath = entry.getKey();
String bloomEnabled = entry.getValue();
propsBuilder.withBloomFilterEnabled(colPath, Boolean.valueOf(bloomEnabled));
}

for (Map.Entry<String, String> entry : columnBloomFilterNDVs.entrySet()) {
String colPath = entry.getKey();
String numDistinctValue = entry.getValue();
propsBuilder.withBloomFilterNDV(colPath, Long.valueOf(numDistinctValue));
}

ParquetProperties parquetProperties = propsBuilder.build();

return new org.apache.iceberg.parquet.ParquetWriter<>(
conf, file, schema, rowGroupSize, metadata, createWriterFunc, codec,
parquetProperties, metricsConfig, writeMode);
} else {
return new ParquetWriteAdapter<>(new ParquetWriteBuilder<D>(ParquetIO.file(file))
ParquetWriteBuilder<D> parquetWriteBuilder = new ParquetWriteBuilder<D>(ParquetIO.file(file))
.withWriterVersion(writerVersion)
.setType(type)
.setConfig(config)
Expand All @@ -292,11 +317,39 @@ public <D> FileAppender<D> build() throws IOException {
.withRowGroupSize(rowGroupSize)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
.build(),
// TODO: add .withMaxBloomFilterBytes(bloomFilterMaxBytes) once ParquetWriter.Builder supports it
.withBloomFilterEnabled(bloomFilterEnabled);

for (Map.Entry<String, String> entry : columnBloomFilterEnabled.entrySet()) {
String colPath = entry.getKey();
String bloomEnabled = entry.getValue();
parquetWriteBuilder.withBloomFilterEnabled(colPath, Boolean.valueOf(bloomEnabled));
}

for (Map.Entry<String, String> entry : columnBloomFilterNDVs.entrySet()) {
String colPath = entry.getKey();
String numDistinctValue = entry.getValue();
parquetWriteBuilder.withBloomFilterNDV(colPath, Long.valueOf(numDistinctValue));
}

return new ParquetWriteAdapter<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi, what do you think about removing the old ParquetWriteAdapter code? I don't think that anyone uses it anymore.

parquetWriteBuilder.build(),
metricsConfig);
}
}

private static Map<String, String> getBloomColumnConfigMap(String prefix, Map<String, String> config) {
Map<String, String> columnBloomFilterConfig = Maps.newHashMap();
config.keySet().stream()
.filter(key -> key.startsWith(prefix))
.forEach(key -> {
String columnPath = key.replaceFirst(prefix, "");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this uses column name in the config, is there any logic to update these configs when columns are renamed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I don't have a logic to update the configs when columns are renamed. I think we are OK, though. At write path, I use these configs to write bloom filters at file creation time. I don't use these configs any more for read. At read path, the bloom filters are loaded using id instead of column name. If the columns are renamed after the bloom filters have been written, as long as the id are still the same, the bloom filters should be able to loaded OK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're okay for adding read support, but we should consider how to configure write support then.

String bloomFilterMode = config.get(key);
columnBloomFilterConfig.put(columnPath, bloomFilterMode);
});
return columnBloomFilterConfig;
}

private static class Context {
private final int rowGroupSize;
private final int pageSize;
Expand All @@ -305,17 +358,27 @@ private static class Context {
private final String compressionLevel;
private final int rowGroupCheckMinRecordCount;
private final int rowGroupCheckMaxRecordCount;
private final boolean bloomFilterEnabled;
private final int bloomFilterMaxBytes;
private final Map<String, String> columnBloomFilterEnabled;
private final Map<String, String> columnBloomFilterNDVs;

private Context(int rowGroupSize, int pageSize, int dictionaryPageSize,
CompressionCodecName codec, String compressionLevel,
int rowGroupCheckMinRecordCount, int rowGroupCheckMaxRecordCount) {
int rowGroupCheckMinRecordCount, int rowGroupCheckMaxRecordCount,
boolean bloomFilterEnabled, int bloomFilterMaxBytes,
Map<String, String> columnBloomFilterEnabled, Map<String, String> columnBloomFilterNDVs) {
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
this.dictionaryPageSize = dictionaryPageSize;
this.codec = codec;
this.compressionLevel = compressionLevel;
this.rowGroupCheckMinRecordCount = rowGroupCheckMinRecordCount;
this.rowGroupCheckMaxRecordCount = rowGroupCheckMaxRecordCount;
this.bloomFilterEnabled = bloomFilterEnabled;
this.bloomFilterMaxBytes = bloomFilterMaxBytes;
this.columnBloomFilterEnabled = columnBloomFilterEnabled;
this.columnBloomFilterNDVs = columnBloomFilterNDVs;
}

static Context dataContext(Map<String, String> config) {
Expand Down Expand Up @@ -348,8 +411,22 @@ static Context dataContext(Map<String, String> config) {
Preconditions.checkArgument(rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount,
"Row group check maximum record count must be >= minimal record count");

boolean bloomFilterEnabled = PropertyUtil.propertyAsBoolean(config, DEFAULT_PARQUET_BLOOM_FILTER_ENABLED,
DEFAULT_PARQUET_BLOOM_FILTER_ENABLED_DEFAULT);

int bloomFilterMaxBytes = PropertyUtil.propertyAsInt(config, PARQUET_BLOOM_FILTER_MAX_BYTES,
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT);
Preconditions.checkArgument(bloomFilterMaxBytes > 0, "bloom Filter Max Bytes must be > 0");

Map<String, String> columnBloomFilterEnabled =
getBloomColumnConfigMap(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX, config);

Map<String, String> columnBloomFilterNDVs =
getBloomColumnConfigMap(PARQUET_BLOOM_FILTER_COLUMN_EXPECTED_NDV_PREFIX, config);

return new Context(rowGroupSize, pageSize, dictionaryPageSize, codec, compressionLevel,
rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount);
rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount, bloomFilterEnabled, bloomFilterMaxBytes,
columnBloomFilterEnabled, columnBloomFilterNDVs);
}

static Context deleteContext(Map<String, String> config) {
Expand Down Expand Up @@ -385,8 +462,22 @@ static Context deleteContext(Map<String, String> config) {
Preconditions.checkArgument(rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount,
"Row group check maximum record count must be >= minimal record count");

boolean bloomFilterEnabled = PropertyUtil.propertyAsBoolean(config, DEFAULT_PARQUET_BLOOM_FILTER_ENABLED,
DEFAULT_PARQUET_BLOOM_FILTER_ENABLED_DEFAULT);

int bloomFilterMaxBytes = PropertyUtil.propertyAsInt(config, PARQUET_BLOOM_FILTER_MAX_BYTES,
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT);
Preconditions.checkArgument(bloomFilterMaxBytes > 0, "bloom Filter Max Bytes must be > 0");

Map<String, String> columnBloomFilterEnabled =
getBloomColumnConfigMap(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX, config);

Map<String, String> columnBloomFilterNDVs =
getBloomColumnConfigMap(PARQUET_BLOOM_FILTER_COLUMN_EXPECTED_NDV_PREFIX, config);

return new Context(rowGroupSize, pageSize, dictionaryPageSize, codec, compressionLevel,
rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount);
rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount, bloomFilterEnabled, bloomFilterMaxBytes,
columnBloomFilterEnabled, columnBloomFilterNDVs);
}

private static CompressionCodecName toCodec(String codecAsString) {
Expand Down Expand Up @@ -424,6 +515,22 @@ int rowGroupCheckMinRecordCount() {
int rowGroupCheckMaxRecordCount() {
return rowGroupCheckMaxRecordCount;
}

boolean bloomFilterEnabled() {
return bloomFilterEnabled;
}

int bloomFilterMaxBytes() {
return bloomFilterMaxBytes;
}

Map<String, String> columnBloomFilterEnabled() {
return columnBloomFilterEnabled;
}

Map<String, String> columnBloomFilterNDVs() {
return columnBloomFilterNDVs;
}
}
}

Expand Down Expand Up @@ -903,12 +1010,14 @@ public <D> CloseableIterable<D> build() {
Schema fileSchema = ParquetSchemaUtil.convert(type);
builder.useStatsFilter()
.useDictionaryFilter()
.useBloomFilter()
.useRecordFilter(filterRecords)
.withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive));
} else {
// turn off filtering
builder.useStatsFilter(false)
.useDictionaryFilter(false)
.useBloomFilter(false)
.useRecordFilter(false);
}

Expand Down
Loading