Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion api/src/main/java/org/apache/iceberg/expressions/Binder.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,21 @@ static Expression bind(StructType struct,
}

public static Set<Integer> boundReferences(StructType struct, List<Expression> exprs, boolean caseSensitive) {
return exprReferences(struct, exprs, caseSensitive, false);
}

public static Set<Integer> exprReferences(
StructType struct, List<Expression> exprs, boolean caseSensitive, boolean alreadyBound) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is a good idea to add a second boolean argument to this method. It is confusing enough with just one.

How about using a different method name for this and then renaming this to a be a private internal implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this method name to exprReferences (please let me know if you have a better name), but I still need to keep this as public because I need to access this method from ParquetBloomRowGroupFilter, which is in a different package.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just references since it doesn't bind?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Will change.

if (exprs == null) {
return ImmutableSet.of();
}
ReferenceVisitor visitor = new ReferenceVisitor();
for (Expression expr : exprs) {
ExpressionVisitors.visit(bind(struct, expr, caseSensitive), visitor);
if (!alreadyBound) {
ExpressionVisitors.visit(bind(struct, expr, caseSensitive), visitor);
} else {
ExpressionVisitors.visit(expr, visitor);
}
}
return visitor.references;
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ private TableProperties() {
"write.delete.parquet.row-group-check-max-record-count";
public static final int PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT = 10000;

public static final String PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column.";

public static final String PARQUET_BLOOM_FILTER_MAX_BYTES = "write.parquet.bloom-filter-max-bytes";
public static final int PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT = 1024 * 1024;

public static final String AVRO_COMPRESSION = "write.avro.compression-codec";
public static final String DELETE_AVRO_COMPRESSION = "write.delete.avro.compression-codec";
public static final String AVRO_COMPRESSION_DEFAULT = "gzip";
Expand Down
4 changes: 3 additions & 1 deletion docs/tables/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ Iceberg tables support table properties to configure table behavior, like the de
| write.parquet.page-size-bytes | 1048576 (1 MB) | Parquet page size |
| write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size |
| write.parquet.compression-codec | gzip | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed |
| write.parquet.compression-level | null | Parquet compression level |
| write.parquet.compression-level | null | Parquet compression level
| write.parquet.bloom-filter-enabled.column.col1 | (not set) | Enables writing a bloom filter for the column |
| write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the behavior of this? If the NDV requires a size that is too large, does it skip writing the bloom filter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the NDV requires a size that is too large, parquet still writes the bloom filter using the max bytes set by this property, not using the bitset calculated by NDV.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there probably isn't much we can do about this, although that behavior makes no sense to me. Is it possible to set the expected false positive probability anywhere? Or is that hard-coded in the Parquet library?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't a property to set fpp in Parquet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What fpp is used by Parquet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parquet uses 0.01 for fpp.
I chatted offline with @chenjunjiedada, we can probably add a config for fpp in parquet first, and then in iceberg.

| write.avro.compression-codec | gzip | Avro compression codec: gzip(deflate with 9 level), zstd, snappy, uncompressed |
| write.avro.compression-level | null | Avro compression level |
| write.orc.stripe-size-bytes | 67108864 (64 MB) | Define the default ORC stripe size, in bytes |
Expand Down
81 changes: 73 additions & 8 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
Expand Down Expand Up @@ -239,6 +242,8 @@ public <D> FileAppender<D> build() throws IOException {
CompressionCodecName codec = context.codec();
int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount();
int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount();
int bloomFilterMaxBytes = context.bloomFilterMaxBytes();
Map<String, String> columnBloomFilterEnabled = context.columnBloomFilterEnabled();

if (compressionLevel != null) {
switch (codec) {
Expand Down Expand Up @@ -269,19 +274,27 @@ public <D> FileAppender<D> build() throws IOException {
conf.set(entry.getKey(), entry.getValue());
}

ParquetProperties parquetProperties = ParquetProperties.builder()
ParquetProperties.Builder propsBuilder = ParquetProperties.builder()
.withWriterVersion(writerVersion)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
.withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount)
.withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount)
.build();
.withMaxBloomFilterBytes(bloomFilterMaxBytes);

for (Map.Entry<String, String> entry : columnBloomFilterEnabled.entrySet()) {
String colPath = entry.getKey();
String bloomEnabled = entry.getValue();
propsBuilder.withBloomFilterEnabled(colPath, Boolean.valueOf(bloomEnabled));
}

ParquetProperties parquetProperties = propsBuilder.build();

return new org.apache.iceberg.parquet.ParquetWriter<>(
conf, file, schema, rowGroupSize, metadata, createWriterFunc, codec,
parquetProperties, metricsConfig, writeMode);
} else {
return new ParquetWriteAdapter<>(new ParquetWriteBuilder<D>(ParquetIO.file(file))
ParquetWriteBuilder<D> parquetWriteBuilder = new ParquetWriteBuilder<D>(ParquetIO.file(file))
.withWriterVersion(writerVersion)
.setType(type)
.setConfig(config)
Expand All @@ -291,12 +304,32 @@ public <D> FileAppender<D> build() throws IOException {
.withWriteMode(writeMode)
.withRowGroupSize(rowGroupSize)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
.build(),
.withDictionaryPageSize(dictionaryPageSize);

for (Map.Entry<String, String> entry : columnBloomFilterEnabled.entrySet()) {
String colPath = entry.getKey();
String bloomEnabled = entry.getValue();
parquetWriteBuilder.withBloomFilterEnabled(colPath, Boolean.valueOf(bloomEnabled));
}

return new ParquetWriteAdapter<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi, what do you think about removing the old ParquetWriteAdapter code? I don't think that anyone uses it anymore.

parquetWriteBuilder.build(),
metricsConfig);
}
}

private static Map<String, String> bloomColumnConfigMap(String prefix, Map<String, String> config) {
Map<String, String> columnBloomFilterConfig = Maps.newHashMap();
config.keySet().stream()
.filter(key -> key.startsWith(prefix))
.forEach(key -> {
String columnPath = key.replaceFirst(prefix, "");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this uses column name in the config, is there any logic to update these configs when columns are renamed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I don't have a logic to update the configs when columns are renamed. I think we are OK, though. At write path, I use these configs to write bloom filters at file creation time. I don't use these configs any more for read. At read path, the bloom filters are loaded using id instead of column name. If the columns are renamed after the bloom filters have been written, as long as the id are still the same, the bloom filters should be able to loaded OK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're okay for adding read support, but we should consider how to configure write support then.

String bloomFilterMode = config.get(key);
columnBloomFilterConfig.put(columnPath, bloomFilterMode);
});
return columnBloomFilterConfig;
}

private static class Context {
private final int rowGroupSize;
private final int pageSize;
Expand All @@ -305,17 +338,23 @@ private static class Context {
private final String compressionLevel;
private final int rowGroupCheckMinRecordCount;
private final int rowGroupCheckMaxRecordCount;
private final int bloomFilterMaxBytes;
private final Map<String, String> columnBloomFilterEnabled;

private Context(int rowGroupSize, int pageSize, int dictionaryPageSize,
CompressionCodecName codec, String compressionLevel,
int rowGroupCheckMinRecordCount, int rowGroupCheckMaxRecordCount) {
int rowGroupCheckMinRecordCount, int rowGroupCheckMaxRecordCount,
int bloomFilterMaxBytes,
Map<String, String> columnBloomFilterEnabled) {
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
this.dictionaryPageSize = dictionaryPageSize;
this.codec = codec;
this.compressionLevel = compressionLevel;
this.rowGroupCheckMinRecordCount = rowGroupCheckMinRecordCount;
this.rowGroupCheckMaxRecordCount = rowGroupCheckMaxRecordCount;
this.bloomFilterMaxBytes = bloomFilterMaxBytes;
this.columnBloomFilterEnabled = columnBloomFilterEnabled;
}

static Context dataContext(Map<String, String> config) {
Expand Down Expand Up @@ -348,8 +387,16 @@ static Context dataContext(Map<String, String> config) {
Preconditions.checkArgument(rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount,
"Row group check maximum record count must be >= minimal record count");

int bloomFilterMaxBytes = PropertyUtil.propertyAsInt(config, PARQUET_BLOOM_FILTER_MAX_BYTES,
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT);
Preconditions.checkArgument(bloomFilterMaxBytes > 0, "bloom Filter Max Bytes must be > 0");

Map<String, String> columnBloomFilterEnabled =
bloomColumnConfigMap(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX, config);

return new Context(rowGroupSize, pageSize, dictionaryPageSize, codec, compressionLevel,
rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount);
rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount, bloomFilterMaxBytes,
columnBloomFilterEnabled);
}

static Context deleteContext(Map<String, String> config) {
Expand Down Expand Up @@ -385,8 +432,16 @@ static Context deleteContext(Map<String, String> config) {
Preconditions.checkArgument(rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount,
"Row group check maximum record count must be >= minimal record count");

int bloomFilterMaxBytes = PropertyUtil.propertyAsInt(config, PARQUET_BLOOM_FILTER_MAX_BYTES,
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT);
Preconditions.checkArgument(bloomFilterMaxBytes > 0, "bloom Filter Max Bytes must be > 0");

Map<String, String> columnBloomFilterEnabled =
bloomColumnConfigMap(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX, config);

return new Context(rowGroupSize, pageSize, dictionaryPageSize, codec, compressionLevel,
rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount);
rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount, bloomFilterMaxBytes,
columnBloomFilterEnabled);
}

private static CompressionCodecName toCodec(String codecAsString) {
Expand Down Expand Up @@ -424,6 +479,14 @@ int rowGroupCheckMinRecordCount() {
int rowGroupCheckMaxRecordCount() {
return rowGroupCheckMaxRecordCount;
}

int bloomFilterMaxBytes() {
return bloomFilterMaxBytes;
}

Map<String, String> columnBloomFilterEnabled() {
return columnBloomFilterEnabled;
}
}
}

Expand Down Expand Up @@ -903,12 +966,14 @@ public <D> CloseableIterable<D> build() {
Schema fileSchema = ParquetSchemaUtil.convert(type);
builder.useStatsFilter()
.useDictionaryFilter()
.useBloomFilter()
.useRecordFilter(filterRecords)
.withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive));
} else {
// turn off filtering
builder.useStatsFilter(false)
.useDictionaryFilter(false)
.useBloomFilter(false)
.useRecordFilter(false);
}

Expand Down
Loading