Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ private TableProperties() {
"write.delete.parquet.row-group-check-max-record-count";
public static final int PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT = 10000;

public static final String PARQUET_BLOOM_FILTER_MAX_BYTES = "write.parquet.bloom-filter-max-bytes";
public static final int PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT = 1024 * 1024;

public static final String PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column.";

public static final String AVRO_COMPRESSION = "write.avro.compression-codec";
Expand Down
2 changes: 2 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ Iceberg tables support table properties to configure table behavior, like the de
| write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size |
| write.parquet.compression-codec | gzip | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed |
| write.parquet.compression-level | null | Parquet compression level |
| write.parquet.bloom-filter-enabled.column.col1 | (not set) | Enables writing a bloom filter for the column: col1|
| write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset |
| write.avro.compression-codec | gzip | Avro compression codec: gzip(deflate with 9 level), zstd, snappy, uncompressed |
| write.avro.compression-level | null | Avro compression level |
| write.orc.stripe-size-bytes | 67108864 (64 MB) | Define the default ORC stripe size, in bytes |
Expand Down
68 changes: 55 additions & 13 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
Expand Down Expand Up @@ -240,6 +242,8 @@ public <D> FileAppender<D> build() throws IOException {
CompressionCodecName codec = context.codec();
int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount();
int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount();
int bloomFilterMaxBytes = context.bloomFilterMaxBytes();
Map<String, String> columnBloomFilterEnabled = context.columnBloomFilterEnabled();

if (compressionLevel != null) {
switch (codec) {
Expand Down Expand Up @@ -270,13 +274,21 @@ public <D> FileAppender<D> build() throws IOException {
conf.set(entry.getKey(), entry.getValue());
}

ParquetProperties parquetProperties = ParquetProperties.builder()
ParquetProperties.Builder propsBuilder = ParquetProperties.builder()
.withWriterVersion(writerVersion)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
.withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount)
.withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount)
.build();
.withMaxBloomFilterBytes(bloomFilterMaxBytes);

for (Map.Entry<String, String> entry : columnBloomFilterEnabled.entrySet()) {
String colPath = entry.getKey();
String bloomEnabled = entry.getValue();
propsBuilder.withBloomFilterEnabled(colPath, Boolean.valueOf(bloomEnabled));
}

ParquetProperties parquetProperties = propsBuilder.build();

return new org.apache.iceberg.parquet.ParquetWriter<>(
conf, file, schema, rowGroupSize, metadata, createWriterFunc, codec,
Expand All @@ -293,15 +305,13 @@ public <D> FileAppender<D> build() throws IOException {
.withRowGroupSize(rowGroupSize)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize);
// Todo: The following code needs to be improved in the bloom filter write path PR.
for (Map.Entry<String, String> entry : config.entrySet()) {
String key = entry.getKey();
if (key.startsWith(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX)) {
String columnPath = key.replaceFirst(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX, "");
String value = entry.getValue();
parquetWriteBuilder.withBloomFilterEnabled(columnPath, Boolean.valueOf(value));
}

for (Map.Entry<String, String> entry : columnBloomFilterEnabled.entrySet()) {
String colPath = entry.getKey();
String bloomEnabled = entry.getValue();
parquetWriteBuilder.withBloomFilterEnabled(colPath, Boolean.valueOf(bloomEnabled));
}

return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig);
}
}
Expand All @@ -314,17 +324,23 @@ private static class Context {
private final String compressionLevel;
private final int rowGroupCheckMinRecordCount;
private final int rowGroupCheckMaxRecordCount;
private final int bloomFilterMaxBytes;
private final Map<String, String> columnBloomFilterEnabled;

private Context(int rowGroupSize, int pageSize, int dictionaryPageSize,
CompressionCodecName codec, String compressionLevel,
int rowGroupCheckMinRecordCount, int rowGroupCheckMaxRecordCount) {
int rowGroupCheckMinRecordCount, int rowGroupCheckMaxRecordCount,
int bloomFilterMaxBytes,
Map<String, String> columnBloomFilterEnabled) {
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
this.dictionaryPageSize = dictionaryPageSize;
this.codec = codec;
this.compressionLevel = compressionLevel;
this.rowGroupCheckMinRecordCount = rowGroupCheckMinRecordCount;
this.rowGroupCheckMaxRecordCount = rowGroupCheckMaxRecordCount;
this.bloomFilterMaxBytes = bloomFilterMaxBytes;
this.columnBloomFilterEnabled = columnBloomFilterEnabled;
}

static Context dataContext(Map<String, String> config) {
Expand Down Expand Up @@ -357,8 +373,16 @@ static Context dataContext(Map<String, String> config) {
Preconditions.checkArgument(rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount,
"Row group check maximum record count must be >= minimal record count");

int bloomFilterMaxBytes = PropertyUtil.propertyAsInt(config, PARQUET_BLOOM_FILTER_MAX_BYTES,
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT);
Preconditions.checkArgument(bloomFilterMaxBytes > 0, "bloom Filter Max Bytes must be > 0");

Map<String, String> columnBloomFilterEnabled =
PropertyUtil.propertiesWithPrefix(config, PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX);

return new Context(rowGroupSize, pageSize, dictionaryPageSize, codec, compressionLevel,
rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount);
rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount, bloomFilterMaxBytes,
columnBloomFilterEnabled);
}

static Context deleteContext(Map<String, String> config) {
Expand Down Expand Up @@ -394,8 +418,16 @@ static Context deleteContext(Map<String, String> config) {
Preconditions.checkArgument(rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount,
"Row group check maximum record count must be >= minimal record count");

int bloomFilterMaxBytes = PropertyUtil.propertyAsInt(config, PARQUET_BLOOM_FILTER_MAX_BYTES,
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT);
Preconditions.checkArgument(bloomFilterMaxBytes > 0, "bloom Filter Max Bytes must be > 0");

Map<String, String> columnBloomFilterEnabled =
PropertyUtil.propertiesWithPrefix(config, PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX);

return new Context(rowGroupSize, pageSize, dictionaryPageSize, codec, compressionLevel,
rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount);
rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount, bloomFilterMaxBytes,
columnBloomFilterEnabled);
}

private static CompressionCodecName toCodec(String codecAsString) {
Expand Down Expand Up @@ -433,6 +465,14 @@ int rowGroupCheckMinRecordCount() {
int rowGroupCheckMaxRecordCount() {
return rowGroupCheckMaxRecordCount;
}

int bloomFilterMaxBytes() {
return bloomFilterMaxBytes;
}

Map<String, String> columnBloomFilterEnabled() {
return columnBloomFilterEnabled;
}
}
}

Expand Down Expand Up @@ -913,11 +953,13 @@ public <D> CloseableIterable<D> build() {
builder.useStatsFilter()
.useDictionaryFilter()
.useRecordFilter(filterRecords)
.useBloomFilter()
.withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive));
} else {
// turn off filtering
builder.useStatsFilter(false)
.useDictionaryFilter(false)
.useBloomFilter(false)
.useRecordFilter(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand Down Expand Up @@ -224,7 +225,7 @@ private void startRowGroup() {
compressor, parquetSchema, props.getAllocator(), this.columnIndexTruncateLength);

this.flushPageStoreToWriter = flushToWriter.bind(pageStore);
this.writeStore = props.newColumnWriteStore(parquetSchema, pageStore);
this.writeStore = props.newColumnWriteStore(parquetSchema, pageStore, (BloomFilterWriteStore) pageStore);

model.setColumnStore(writeStore);
}
Expand Down
Loading