Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion api/src/main/java/org/apache/iceberg/expressions/Binder.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Set;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.ExpressionVisitors.ExpressionVisitor;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -96,11 +97,29 @@ public static Set<Integer> boundReferences(StructType struct, List<Expression> e
}
ReferenceVisitor visitor = new ReferenceVisitor();
for (Expression expr : exprs) {
ExpressionVisitors.visit(bind(struct, expr, caseSensitive), visitor);
if (isBound(expr)) {
ExpressionVisitors.visit(expr, visitor);
} else {
ExpressionVisitors.visit(bind(struct, expr, caseSensitive), visitor);
}
}
return visitor.references;
}

/**
* Returns whether an expression is bound.
* <p>
* An expression is bound if all of its predicates are bound.
*
* @param expr an {@link Expression}
* @return true if the expression is bound
* @throws IllegalArgumentException if the expression has both bound and unbound predicates.
*/
public static boolean isBound(Expression expr) {
Boolean isBound = ExpressionVisitors.visit(expr, new IsBoundVisitor());
return isBound != null ? isBound : false; // assume unbound if undetermined
}

private static class BindVisitor extends ExpressionVisitor<Expression> {
private final StructType struct;
private final boolean caseSensitive;
Expand Down Expand Up @@ -180,4 +199,41 @@ public <T> Set<Integer> predicate(BoundPredicate<T> pred) {
return references;
}
}

private static class IsBoundVisitor extends ExpressionVisitor<Boolean> {
@Override
public Boolean not(Boolean result) {
return result;
}

@Override
public Boolean and(Boolean leftResult, Boolean rightResult) {
return combineResults(leftResult, rightResult);
}

@Override
public Boolean or(Boolean leftResult, Boolean rightResult) {
return combineResults(leftResult, rightResult);
}

@Override
public <T> Boolean predicate(BoundPredicate<T> pred) {
return true;
}

@Override
public <T> Boolean predicate(UnboundPredicate<T> pred) {
return false;
}

private Boolean combineResults(Boolean isLeftBound, Boolean isRightBound) {
if (isLeftBound != null) {
Preconditions.checkArgument(isRightBound == null || isLeftBound.equals(isRightBound),
"Found partially bound expression");
return isLeftBound;
} else {
return isRightBound;
}
}
}
}
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ private TableProperties() {
"write.delete.parquet.row-group-check-max-record-count";
public static final int PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT = 10000;

public static final String PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column.";

public static final String AVRO_COMPRESSION = "write.avro.compression-codec";
public static final String DELETE_AVRO_COMPRESSION = "write.delete.avro.compression-codec";
public static final String AVRO_COMPRESSION_DEFAULT = "gzip";
Expand Down
17 changes: 13 additions & 4 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
Expand Down Expand Up @@ -281,7 +282,7 @@ public <D> FileAppender<D> build() throws IOException {
conf, file, schema, rowGroupSize, metadata, createWriterFunc, codec,
parquetProperties, metricsConfig, writeMode);
} else {
return new ParquetWriteAdapter<>(new ParquetWriteBuilder<D>(ParquetIO.file(file))
ParquetWriteBuilder<D> parquetWriteBuilder = new ParquetWriteBuilder<D>(ParquetIO.file(file))
.withWriterVersion(writerVersion)
.setType(type)
.setConfig(config)
Expand All @@ -291,9 +292,17 @@ public <D> FileAppender<D> build() throws IOException {
.withWriteMode(writeMode)
.withRowGroupSize(rowGroupSize)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
.build(),
metricsConfig);
.withDictionaryPageSize(dictionaryPageSize);
// Todo: The following code needs to be improved in the bloom filter write path PR.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is okay for now to get this in with tests. Thanks, @huaxingao!

for (Map.Entry<String, String> entry : config.entrySet()) {
String key = entry.getKey();
if (key.startsWith(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX)) {
String columnPath = key.replaceFirst(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX, "");
String value = entry.getValue();
parquetWriteBuilder.withBloomFilterEnabled(columnPath, Boolean.valueOf(value));
}
}
return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig);
}
}

Expand Down
Loading