-
Notifications
You must be signed in to change notification settings - Fork 3k
Add Parquet Row Group Bloom Filter Support #4831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
e814287
3b985f4
f0e6aa1
9c6c64f
0520fb4
893e7b0
7613b9c
1fabb2d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -167,6 +167,16 @@ private TableProperties() { | |
| "write.delete.parquet.row-group-check-max-record-count"; | ||
| public static final int PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT = 10000; | ||
|
|
||
| public static final String DEFAULT_PARQUET_BLOOM_FILTER_ENABLED = "write.parquet.bloom-filter-enabled.default"; | ||
|
||
| public static final boolean DEFAULT_PARQUET_BLOOM_FILTER_ENABLED_DEFAULT = false; | ||
|
|
||
| public static final String PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column."; | ||
| public static final String PARQUET_BLOOM_FILTER_COLUMN_EXPECTED_NDV_PREFIX = | ||
|
||
| "write.parquet.bloom-filter-expected-ndv.column."; | ||
|
|
||
| public static final String PARQUET_BLOOM_FILTER_MAX_BYTES = "write.parquet.bloom-filter-max-bytes"; | ||
| public static final int PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT = 1024 * 1024; | ||
|
|
||
| public static final String AVRO_COMPRESSION = "write.avro.compression-codec"; | ||
| public static final String DELETE_AVRO_COMPRESSION = "write.delete.avro.compression-codec"; | ||
| public static final String AVRO_COMPRESSION_DEFAULT = "gzip"; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -50,6 +50,10 @@ Iceberg tables support table properties to configure table behavior, like the de | |
| | write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size | | ||
| | write.parquet.compression-codec | gzip | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed | | ||
| | write.parquet.compression-level | null | Parquet compression level | | ||
| | write.parquet.bloom-filter-enabled.default | false | Whether to enable writing bloom filter for all columns | | ||
| | write.parquet.bloom-filter-enabled.column.col1 | (not set) | Whether to enable writing bloom filter for column 'col1' to allow per-column configuration; This property overrides `bloom-filter-enabled.default` for the specified column; For example, setting both `write.parquet.bloom-filter-enabled.default=true` and `write.parquet.bloom-filter-enabled.column.some_col=false` will enable bloom filter for all columns except `some_col` | | ||
rdblue marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| | write.parquet.bloom-filter-expected-ndv.column.col1 | (not set) | The expected number of distinct values in a column, it is used to compute the optimal size of the bloom filter; Note that the NDV is specific for a parquet file. If this property is not set, the bloom filter will use the maximum size set in `bloom-filter-max-bytes`; If this property is set for a column, then no need to enable the bloom filter with `write.parquet.bloom-filter-enabled` property | | ||
rdblue marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| | write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset | | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the behavior of this? If the NDV requires a size that is too large, does it skip writing the bloom filter?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the NDV requires a size that is too large, parquet still writes the bloom filter using the max bytes set by this property, not using the bitset calculated by NDV.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess there probably isn't much we can do about this, although that behavior makes no sense to me. Is it possible to set the expected false positive probability anywhere? Or is that hard-coded in the Parquet library?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There isn't a property to set fpp in Parquet.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What fpp is used by Parquet?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Parquet uses 0.01 for fpp. |
||
| | write.avro.compression-codec | gzip | Avro compression codec: gzip(deflate with 9 level), zstd, snappy, uncompressed | | ||
| | write.avro.compression-level | null | Avro compression level | | ||
| | write.orc.stripe-size-bytes | 67108864 (64 MB) | Define the default ORC stripe size, in bytes | | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -78,13 +78,19 @@ | |
| import org.apache.parquet.hadoop.metadata.CompressionCodecName; | ||
| import org.apache.parquet.schema.MessageType; | ||
|
|
||
| import static org.apache.iceberg.TableProperties.DEFAULT_PARQUET_BLOOM_FILTER_ENABLED; | ||
| import static org.apache.iceberg.TableProperties.DEFAULT_PARQUET_BLOOM_FILTER_ENABLED_DEFAULT; | ||
| import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; | ||
| import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; | ||
| import static org.apache.iceberg.TableProperties.DELETE_PARQUET_DICT_SIZE_BYTES; | ||
| import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_SIZE_BYTES; | ||
| import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT; | ||
| import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; | ||
| import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_SIZE_BYTES; | ||
| import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX; | ||
| import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_EXPECTED_NDV_PREFIX; | ||
| import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES; | ||
| import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT; | ||
| import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; | ||
| import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT; | ||
| import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; | ||
|
|
@@ -239,6 +245,10 @@ public <D> FileAppender<D> build() throws IOException { | |
| CompressionCodecName codec = context.codec(); | ||
| int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount(); | ||
| int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount(); | ||
| boolean bloomFilterEnabled = context.bloomFilterEnabled(); | ||
| int bloomFilterMaxBytes = context.bloomFilterMaxBytes(); | ||
| Map<String, String> columnBloomFilterEnabled = context.columnBloomFilterEnabled(); | ||
| Map<String, String> columnBloomFilterNDVs = context.columnBloomFilterNDVs(); | ||
|
|
||
| if (compressionLevel != null) { | ||
| switch (codec) { | ||
|
|
@@ -269,19 +279,34 @@ public <D> FileAppender<D> build() throws IOException { | |
| conf.set(entry.getKey(), entry.getValue()); | ||
| } | ||
|
|
||
| ParquetProperties parquetProperties = ParquetProperties.builder() | ||
| ParquetProperties.Builder propsBuilder = ParquetProperties.builder() | ||
| .withWriterVersion(writerVersion) | ||
| .withPageSize(pageSize) | ||
| .withDictionaryPageSize(dictionaryPageSize) | ||
| .withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount) | ||
| .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount) | ||
| .build(); | ||
| .withMaxBloomFilterBytes(bloomFilterMaxBytes) | ||
| .withBloomFilterEnabled(bloomFilterEnabled); | ||
huaxingao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| for (Map.Entry<String, String> entry : columnBloomFilterEnabled.entrySet()) { | ||
| String colPath = entry.getKey(); | ||
| String bloomEnabled = entry.getValue(); | ||
| propsBuilder.withBloomFilterEnabled(colPath, Boolean.valueOf(bloomEnabled)); | ||
| } | ||
|
|
||
| for (Map.Entry<String, String> entry : columnBloomFilterNDVs.entrySet()) { | ||
| String colPath = entry.getKey(); | ||
| String numDistinctValue = entry.getValue(); | ||
| propsBuilder.withBloomFilterNDV(colPath, Long.valueOf(numDistinctValue)); | ||
| } | ||
|
|
||
| ParquetProperties parquetProperties = propsBuilder.build(); | ||
|
|
||
| return new org.apache.iceberg.parquet.ParquetWriter<>( | ||
| conf, file, schema, rowGroupSize, metadata, createWriterFunc, codec, | ||
| parquetProperties, metricsConfig, writeMode); | ||
| } else { | ||
| return new ParquetWriteAdapter<>(new ParquetWriteBuilder<D>(ParquetIO.file(file)) | ||
| ParquetWriteBuilder<D> parquetWriteBuilder = new ParquetWriteBuilder<D>(ParquetIO.file(file)) | ||
| .withWriterVersion(writerVersion) | ||
| .setType(type) | ||
| .setConfig(config) | ||
|
|
@@ -292,11 +317,39 @@ public <D> FileAppender<D> build() throws IOException { | |
| .withRowGroupSize(rowGroupSize) | ||
| .withPageSize(pageSize) | ||
| .withDictionaryPageSize(dictionaryPageSize) | ||
| .build(), | ||
| // TODO: add .withMaxBloomFilterBytes(bloomFilterMaxBytes) once ParquetWriter.Builder supports it | ||
huaxingao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| .withBloomFilterEnabled(bloomFilterEnabled); | ||
|
|
||
| for (Map.Entry<String, String> entry : columnBloomFilterEnabled.entrySet()) { | ||
| String colPath = entry.getKey(); | ||
| String bloomEnabled = entry.getValue(); | ||
| parquetWriteBuilder.withBloomFilterEnabled(colPath, Boolean.valueOf(bloomEnabled)); | ||
| } | ||
|
|
||
| for (Map.Entry<String, String> entry : columnBloomFilterNDVs.entrySet()) { | ||
| String colPath = entry.getKey(); | ||
| String numDistinctValue = entry.getValue(); | ||
| parquetWriteBuilder.withBloomFilterNDV(colPath, Long.valueOf(numDistinctValue)); | ||
| } | ||
|
|
||
| return new ParquetWriteAdapter<>( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @aokolnychyi, what do you think about removing the old |
||
| parquetWriteBuilder.build(), | ||
| metricsConfig); | ||
| } | ||
| } | ||
|
|
||
| private static Map<String, String> getBloomColumnConfigMap(String prefix, Map<String, String> config) { | ||
rdblue marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Map<String, String> columnBloomFilterConfig = Maps.newHashMap(); | ||
| config.keySet().stream() | ||
| .filter(key -> key.startsWith(prefix)) | ||
| .forEach(key -> { | ||
| String columnPath = key.replaceFirst(prefix, ""); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this uses column name in the config, is there any logic to update these configs when columns are renamed?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question. I don't have a logic to update the configs when columns are renamed. I think we are OK, though. At write path, I use these configs to write bloom filters at file creation time. I don't use these configs any more for read. At read path, the bloom filters are loaded using id instead of column name. If the columns are renamed after the bloom filters have been written, as long as the id are still the same, the bloom filters should be able to loaded OK.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're okay for adding read support, but we should consider how to configure write support then. |
||
| String bloomFilterMode = config.get(key); | ||
huaxingao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| columnBloomFilterConfig.put(columnPath, bloomFilterMode); | ||
| }); | ||
| return columnBloomFilterConfig; | ||
| } | ||
|
|
||
| private static class Context { | ||
| private final int rowGroupSize; | ||
| private final int pageSize; | ||
|
|
@@ -305,17 +358,27 @@ private static class Context { | |
| private final String compressionLevel; | ||
| private final int rowGroupCheckMinRecordCount; | ||
| private final int rowGroupCheckMaxRecordCount; | ||
| private final boolean bloomFilterEnabled; | ||
| private final int bloomFilterMaxBytes; | ||
| private final Map<String, String> columnBloomFilterEnabled; | ||
| private final Map<String, String> columnBloomFilterNDVs; | ||
|
|
||
| private Context(int rowGroupSize, int pageSize, int dictionaryPageSize, | ||
| CompressionCodecName codec, String compressionLevel, | ||
| int rowGroupCheckMinRecordCount, int rowGroupCheckMaxRecordCount) { | ||
| int rowGroupCheckMinRecordCount, int rowGroupCheckMaxRecordCount, | ||
| boolean bloomFilterEnabled, int bloomFilterMaxBytes, | ||
| Map<String, String> columnBloomFilterEnabled, Map<String, String> columnBloomFilterNDVs) { | ||
| this.rowGroupSize = rowGroupSize; | ||
| this.pageSize = pageSize; | ||
| this.dictionaryPageSize = dictionaryPageSize; | ||
| this.codec = codec; | ||
| this.compressionLevel = compressionLevel; | ||
| this.rowGroupCheckMinRecordCount = rowGroupCheckMinRecordCount; | ||
| this.rowGroupCheckMaxRecordCount = rowGroupCheckMaxRecordCount; | ||
| this.bloomFilterEnabled = bloomFilterEnabled; | ||
| this.bloomFilterMaxBytes = bloomFilterMaxBytes; | ||
| this.columnBloomFilterEnabled = columnBloomFilterEnabled; | ||
| this.columnBloomFilterNDVs = columnBloomFilterNDVs; | ||
| } | ||
|
|
||
| static Context dataContext(Map<String, String> config) { | ||
|
|
@@ -348,8 +411,22 @@ static Context dataContext(Map<String, String> config) { | |
| Preconditions.checkArgument(rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount, | ||
| "Row group check maximum record count must be >= minimal record count"); | ||
|
|
||
| boolean bloomFilterEnabled = PropertyUtil.propertyAsBoolean(config, DEFAULT_PARQUET_BLOOM_FILTER_ENABLED, | ||
| DEFAULT_PARQUET_BLOOM_FILTER_ENABLED_DEFAULT); | ||
|
|
||
| int bloomFilterMaxBytes = PropertyUtil.propertyAsInt(config, PARQUET_BLOOM_FILTER_MAX_BYTES, | ||
| PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT); | ||
| Preconditions.checkArgument(bloomFilterMaxBytes > 0, "bloom Filter Max Bytes must be > 0"); | ||
|
|
||
| Map<String, String> columnBloomFilterEnabled = | ||
| getBloomColumnConfigMap(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX, config); | ||
|
|
||
| Map<String, String> columnBloomFilterNDVs = | ||
| getBloomColumnConfigMap(PARQUET_BLOOM_FILTER_COLUMN_EXPECTED_NDV_PREFIX, config); | ||
|
|
||
| return new Context(rowGroupSize, pageSize, dictionaryPageSize, codec, compressionLevel, | ||
| rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount); | ||
| rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount, bloomFilterEnabled, bloomFilterMaxBytes, | ||
| columnBloomFilterEnabled, columnBloomFilterNDVs); | ||
| } | ||
|
|
||
| static Context deleteContext(Map<String, String> config) { | ||
|
|
@@ -385,8 +462,22 @@ static Context deleteContext(Map<String, String> config) { | |
| Preconditions.checkArgument(rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount, | ||
| "Row group check maximum record count must be >= minimal record count"); | ||
|
|
||
| boolean bloomFilterEnabled = PropertyUtil.propertyAsBoolean(config, DEFAULT_PARQUET_BLOOM_FILTER_ENABLED, | ||
| DEFAULT_PARQUET_BLOOM_FILTER_ENABLED_DEFAULT); | ||
|
|
||
| int bloomFilterMaxBytes = PropertyUtil.propertyAsInt(config, PARQUET_BLOOM_FILTER_MAX_BYTES, | ||
| PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT); | ||
| Preconditions.checkArgument(bloomFilterMaxBytes > 0, "bloom Filter Max Bytes must be > 0"); | ||
|
|
||
| Map<String, String> columnBloomFilterEnabled = | ||
| getBloomColumnConfigMap(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX, config); | ||
|
|
||
| Map<String, String> columnBloomFilterNDVs = | ||
| getBloomColumnConfigMap(PARQUET_BLOOM_FILTER_COLUMN_EXPECTED_NDV_PREFIX, config); | ||
|
|
||
| return new Context(rowGroupSize, pageSize, dictionaryPageSize, codec, compressionLevel, | ||
| rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount); | ||
| rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount, bloomFilterEnabled, bloomFilterMaxBytes, | ||
| columnBloomFilterEnabled, columnBloomFilterNDVs); | ||
| } | ||
|
|
||
| private static CompressionCodecName toCodec(String codecAsString) { | ||
|
|
@@ -424,6 +515,22 @@ int rowGroupCheckMinRecordCount() { | |
| int rowGroupCheckMaxRecordCount() { | ||
| return rowGroupCheckMaxRecordCount; | ||
| } | ||
|
|
||
| boolean bloomFilterEnabled() { | ||
| return bloomFilterEnabled; | ||
| } | ||
|
|
||
| int bloomFilterMaxBytes() { | ||
| return bloomFilterMaxBytes; | ||
| } | ||
|
|
||
| Map<String, String> columnBloomFilterEnabled() { | ||
| return columnBloomFilterEnabled; | ||
| } | ||
|
|
||
| Map<String, String> columnBloomFilterNDVs() { | ||
| return columnBloomFilterNDVs; | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -903,12 +1010,14 @@ public <D> CloseableIterable<D> build() { | |
| Schema fileSchema = ParquetSchemaUtil.convert(type); | ||
| builder.useStatsFilter() | ||
| .useDictionaryFilter() | ||
| .useBloomFilter() | ||
| .useRecordFilter(filterRecords) | ||
| .withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive)); | ||
| } else { | ||
| // turn off filtering | ||
| builder.useStatsFilter(false) | ||
| .useDictionaryFilter(false) | ||
| .useBloomFilter(false) | ||
| .useRecordFilter(false); | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.