-
Notifications
You must be signed in to change notification settings - Fork 3k
Add Parquet Row Group Bloom Filter Support #4831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
e814287
3b985f4
f0e6aa1
9c6c64f
0520fb4
893e7b0
7613b9c
1fabb2d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -167,6 +167,17 @@ private TableProperties() { | |
| "write.delete.parquet.row-group-check-max-record-count"; | ||
| public static final int PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT = 10000; | ||
|
|
||
| public static final String DEFAULT_PARQUET_BLOOM_FILTER_ENABLED = "write.parquet.bloom-filter-enabled.default"; | ||
|
||
| public static final boolean DEFAULT_PARQUET_BLOOM_FILTER_ENABLED_DEFAULT = false; | ||
|
|
||
| public static final String PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column."; | ||
| public static final String PARQUET_BLOOM_FILTER_COLUMN_EXPECTED_NDV_PREFIX = | ||
|
||
| "write.parquet.bloom-filter-expected-ndv.column."; | ||
|
|
||
| public static final String PARQUET_BLOOM_FILTER_MAX_BYTES = "write.parquet.bloom-filter-max-bytes"; | ||
| public static final int PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT = 1024 * 1024; | ||
|
|
||
|
|
||
huaxingao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| public static final String AVRO_COMPRESSION = "write.avro.compression-codec"; | ||
| public static final String DELETE_AVRO_COMPRESSION = "write.delete.avro.compression-codec"; | ||
| public static final String AVRO_COMPRESSION_DEFAULT = "gzip"; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -78,13 +78,19 @@ | |
| import org.apache.parquet.hadoop.metadata.CompressionCodecName; | ||
| import org.apache.parquet.schema.MessageType; | ||
|
|
||
| import static org.apache.iceberg.TableProperties.DEFAULT_PARQUET_BLOOM_FILTER_ENABLED; | ||
| import static org.apache.iceberg.TableProperties.DEFAULT_PARQUET_BLOOM_FILTER_ENABLED_DEFAULT; | ||
| import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; | ||
| import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; | ||
| import static org.apache.iceberg.TableProperties.DELETE_PARQUET_DICT_SIZE_BYTES; | ||
| import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_SIZE_BYTES; | ||
| import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT; | ||
| import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; | ||
| import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_SIZE_BYTES; | ||
| import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX; | ||
| import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_EXPECTED_NDV_PREFIX; | ||
| import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES; | ||
| import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT; | ||
| import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; | ||
| import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT; | ||
| import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; | ||
|
|
@@ -239,6 +245,13 @@ public <D> FileAppender<D> build() throws IOException { | |
| CompressionCodecName codec = context.codec(); | ||
| int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount(); | ||
| int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount(); | ||
| boolean bloomFilterEnabled = PropertyUtil.propertyAsBoolean(config, DEFAULT_PARQUET_BLOOM_FILTER_ENABLED, | ||
| DEFAULT_PARQUET_BLOOM_FILTER_ENABLED_DEFAULT); | ||
| int bloomFilterMaxBytes = PropertyUtil.propertyAsInt(config, PARQUET_BLOOM_FILTER_MAX_BYTES, | ||
| PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT); | ||
| Map<String, String> columnBloomFilterModes = getBloomColumnConfigMap(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX); | ||
| Map<String, String> columnBloomFilterNDVs = | ||
| getBloomColumnConfigMap(PARQUET_BLOOM_FILTER_COLUMN_EXPECTED_NDV_PREFIX); | ||
huaxingao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| if (compressionLevel != null) { | ||
| switch (codec) { | ||
|
|
@@ -269,19 +282,34 @@ public <D> FileAppender<D> build() throws IOException { | |
| conf.set(entry.getKey(), entry.getValue()); | ||
| } | ||
|
|
||
| ParquetProperties parquetProperties = ParquetProperties.builder() | ||
| ParquetProperties.Builder propsBuilder = ParquetProperties.builder() | ||
| .withWriterVersion(writerVersion) | ||
| .withPageSize(pageSize) | ||
| .withDictionaryPageSize(dictionaryPageSize) | ||
| .withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount) | ||
| .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount) | ||
| .build(); | ||
| .withMaxBloomFilterBytes(bloomFilterMaxBytes) | ||
| .withBloomFilterEnabled(bloomFilterEnabled); | ||
huaxingao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| for (Map.Entry<String, String> entry : columnBloomFilterModes.entrySet()) { | ||
| String col = entry.getKey(); | ||
| String value = entry.getValue(); | ||
| propsBuilder.withBloomFilterEnabled(col, Boolean.valueOf(value)); | ||
| } | ||
|
|
||
| for (Map.Entry<String, String> entry : columnBloomFilterNDVs.entrySet()) { | ||
| String col = entry.getKey(); | ||
| String value = entry.getValue(); | ||
| propsBuilder.withBloomFilterNDV(col, Long.valueOf(value)); | ||
| } | ||
|
|
||
| ParquetProperties parquetProperties = propsBuilder.build(); | ||
|
|
||
| return new org.apache.iceberg.parquet.ParquetWriter<>( | ||
| conf, file, schema, rowGroupSize, metadata, createWriterFunc, codec, | ||
| parquetProperties, metricsConfig, writeMode); | ||
| } else { | ||
| return new ParquetWriteAdapter<>(new ParquetWriteBuilder<D>(ParquetIO.file(file)) | ||
| ParquetWriteBuilder<D> parquetWriteBuilder = new ParquetWriteBuilder<D>(ParquetIO.file(file)) | ||
| .withWriterVersion(writerVersion) | ||
| .setType(type) | ||
| .setConfig(config) | ||
|
|
@@ -292,11 +320,39 @@ public <D> FileAppender<D> build() throws IOException { | |
| .withRowGroupSize(rowGroupSize) | ||
| .withPageSize(pageSize) | ||
| .withDictionaryPageSize(dictionaryPageSize) | ||
| .build(), | ||
| // TODO: add .withMaxBloomFilterBytes(bloomFilterMaxBytes) once ParquetWriter.Builder supports it | ||
huaxingao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| .withBloomFilterEnabled(bloomFilterEnabled); | ||
|
|
||
| for (Map.Entry<String, String> entry : columnBloomFilterModes.entrySet()) { | ||
| String col = entry.getKey(); | ||
| String value = entry.getValue(); | ||
huaxingao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| parquetWriteBuilder.withBloomFilterEnabled(col, Boolean.valueOf(value)); | ||
| } | ||
|
|
||
| for (Map.Entry<String, String> entry : columnBloomFilterNDVs.entrySet()) { | ||
| String col = entry.getKey(); | ||
| String value = entry.getValue(); | ||
huaxingao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| parquetWriteBuilder.withBloomFilterNDV(col, Long.valueOf(value)); | ||
| } | ||
|
|
||
| return new ParquetWriteAdapter<>( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @aokolnychyi, what do you think about removing the old |
||
| parquetWriteBuilder.build(), | ||
| metricsConfig); | ||
| } | ||
| } | ||
|
|
||
| private Map<String, String> getBloomColumnConfigMap(String prefix) { | ||
| Map<String, String> columnBloomFilterModes = Maps.newHashMap(); | ||
| config.keySet().stream() | ||
| .filter(key -> key.startsWith(prefix)) | ||
| .forEach(key -> { | ||
| String columnAlias = key.replaceFirst(prefix, ""); | ||
| String bloomFilterMode = config.get(key); | ||
huaxingao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| columnBloomFilterModes.put(columnAlias, bloomFilterMode); | ||
| }); | ||
| return columnBloomFilterModes; | ||
| } | ||
|
|
||
| private static class Context { | ||
| private final int rowGroupSize; | ||
| private final int pageSize; | ||
|
|
@@ -903,12 +959,14 @@ public <D> CloseableIterable<D> build() { | |
| Schema fileSchema = ParquetSchemaUtil.convert(type); | ||
| builder.useStatsFilter() | ||
| .useDictionaryFilter() | ||
| .useBloomFilter() | ||
| .useRecordFilter(filterRecords) | ||
| .withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive)); | ||
| } else { | ||
| // turn off filtering | ||
| builder.useStatsFilter(false) | ||
| .useDictionaryFilter(false) | ||
| .useBloomFilter(false) | ||
| .useRecordFilter(false); | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.