Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ private TableProperties() {
public static final String AVRO_COMPRESSION = "write.avro.compression-codec";
public static final String AVRO_COMPRESSION_DEFAULT = "gzip";

public static final String PARQUET_BLOOM_FILTER_ENABLED = "write.parquet.bloom-filter-enabled";
public static final boolean PARQUET_BLOOM_FILTER_ENABLED_DEFAULT = false;

public static final String PARQUET_BLOOM_FILTER_EXPECTED_NDV = "write.parquet.bloom-filter-expected-ndv";
public static final long PARQUET_BLOOM_FILTER_EXPECTED_NDV_DEFAULT = -1L;

public static final String PARQUET_BLOOM_FILTER_MAX_BYTES = "write.parquet.bloom-filter-max-bytes";
public static final int PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT = 1024 * 1024;

public static final String SPLIT_SIZE = "read.split.target-size";
public static final long SPLIT_SIZE_DEFAULT = 134217728; // 128 MB

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.parquet;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;

/**
* TODO: Once org.apache.parquet.hadoop.ColumnConfigParser is made public, should replace this class.
* Parses the specified key-values in the format of root.key#column.path from a {@link Configuration} object.
*/
class ColumnConfigParser {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg doesn't use the same names that Parquet would, and it also doesn't use a Configuration to store properties. We need to think about what would make sense for Iceberg here, and using # to delimit properties is probably too confusing.

I think that the properties proposed in this PR for global defaults make sense, like write.parquet.bloom-filter-enabled, although the NDV default is probably not useful given that we expect NDV to vary widely across fields. For the column-specific settings, I think we may want to follow the same pattern that is used by metrics collection. That embeds the column name in the property, like write.metadata.metrics.column.col1. This could be write.parquet.bloom-filter.col1.enabled or write.parquet.bloom-filter.col1.max-bytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will try to change the configuration pattern when I have time.


private static class ConfigHelper<T> {
private final String prefix;
private final Function<String, T> function;
private final BiConsumer<String, T> consumer;

ConfigHelper(String prefix, Function<String, T> function, BiConsumer<String, T> consumer) {
this.prefix = prefix;
this.function = function;
this.consumer = consumer;
}

public void processKey(String key) {
if (key.startsWith(prefix)) {
String columnPath = key.substring(prefix.length());
T value = function.apply(key);
consumer.accept(columnPath, value);
}
}
}

private final List<ConfigHelper<?>> helpers = new ArrayList<>();

public <T> ColumnConfigParser withColumnConfig(String rootKey, Function<String, T> function,
BiConsumer<String, T> consumer) {
helpers.add(new ConfigHelper<T>(rootKey + '#', function, consumer));
return this;
}

public void parseConfig(Configuration conf) {
for (Map.Entry<String, String> entry : conf) {
for (ConfigHelper<?> helper : helpers) {
// We retrieve the value from function instead of parsing from the string here to use the exact implementations
// in Configuration
helper.processKey(entry.getKey());
}
}
}

public void parseConfig(Map<String, String> conf) {
for (Map.Entry<String, String> entry : conf.entrySet()) {
for (ConfigHelper<?> helper : helpers) {
// We retrieve the value from function instead of parsing from the string here to use the exact implementations
// in Configuration
helper.processKey(entry.getKey());
}
}
}
}
49 changes: 45 additions & 4 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.avro.AvroReadSupport;
Expand All @@ -75,6 +76,12 @@
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;

import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_ENABLED;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_EXPECTED_NDV;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_EXPECTED_NDV_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
Expand Down Expand Up @@ -205,6 +212,10 @@ public <D> FileAppender<D> build() throws IOException {
PARQUET_DICT_SIZE_BYTES, PARQUET_DICT_SIZE_BYTES_DEFAULT));
String compressionLevel = config.getOrDefault(
PARQUET_COMPRESSION_LEVEL, PARQUET_COMPRESSION_LEVEL_DEFAULT);
boolean bloomFilterEnabled = PropertyUtil.propertyAsBoolean(config, PARQUET_BLOOM_FILTER_ENABLED,
PARQUET_BLOOM_FILTER_ENABLED_DEFAULT);
int bloomFilterMaxBytes = PropertyUtil.propertyAsInt(config, PARQUET_BLOOM_FILTER_MAX_BYTES,
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT);

if (compressionLevel != null) {
switch (codec()) {
Expand Down Expand Up @@ -241,17 +252,31 @@ public <D> FileAppender<D> build() throws IOException {
conf.set(entry.getKey(), entry.getValue());
}

ParquetProperties parquetProperties = ParquetProperties.builder()
ParquetProperties.Builder propsBuilder = ParquetProperties.builder()
.withWriterVersion(writerVersion)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
.build();
.withMaxBloomFilterBytes(bloomFilterMaxBytes)
.withBloomFilterEnabled(bloomFilterEnabled);

new ColumnConfigParser()
.withColumnConfig(
PARQUET_BLOOM_FILTER_ENABLED,
key -> conf.getBoolean(key, PARQUET_BLOOM_FILTER_ENABLED_DEFAULT),
propsBuilder::withBloomFilterEnabled)
.withColumnConfig(
PARQUET_BLOOM_FILTER_EXPECTED_NDV,
key -> conf.getLong(key, PARQUET_BLOOM_FILTER_EXPECTED_NDV_DEFAULT),
propsBuilder::withBloomFilterNDV)
.parseConfig(conf);

ParquetProperties parquetProperties = propsBuilder.build();

return new org.apache.iceberg.parquet.ParquetWriter<>(
conf, file, schema, rowGroupSize, metadata, createWriterFunc, codec(),
parquetProperties, metricsConfig, writeMode);
} else {
return new ParquetWriteAdapter<>(new ParquetWriteBuilder<D>(ParquetIO.file(file))
ParquetWriteBuilder<D> parquetWriteBuilder = new ParquetWriteBuilder<D>(ParquetIO.file(file))
.withWriterVersion(writerVersion)
.setType(type)
.setConfig(config)
Expand All @@ -262,7 +287,21 @@ conf, file, schema, rowGroupSize, metadata, createWriterFunc, codec(),
.withRowGroupSize(rowGroupSize)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
.build(),
.withBloomFilterEnabled(bloomFilterEnabled);

new ColumnConfigParser()
.withColumnConfig(
PARQUET_BLOOM_FILTER_ENABLED,
key -> PropertyUtil.propertyAsBoolean(config, key, PARQUET_BLOOM_FILTER_ENABLED_DEFAULT),
parquetWriteBuilder::withBloomFilterEnabled)
.withColumnConfig(
PARQUET_BLOOM_FILTER_EXPECTED_NDV,
key -> PropertyUtil.propertyAsLong(config, key, PARQUET_BLOOM_FILTER_EXPECTED_NDV_DEFAULT),
parquetWriteBuilder::withBloomFilterNDV)
.parseConfig(config);

return new ParquetWriteAdapter<>(
parquetWriteBuilder.build(),
metricsConfig);
}
}
Expand Down Expand Up @@ -638,12 +677,14 @@ public <D> CloseableIterable<D> build() {
Schema fileSchema = ParquetSchemaUtil.convert(type);
builder.useStatsFilter()
.useDictionaryFilter()
.useBloomFilter()
.useRecordFilter(filterRecords)
.withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive));
} else {
// turn off filtering
builder.useStatsFilter(false)
.useDictionaryFilter(false)
.useBloomFilter(false)
.useRecordFilter(false);
}

Expand Down
Loading