Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.iceberg.parquet;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Set;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.BoundPredicate;
import org.apache.iceberg.expressions.BoundReference;
Expand All @@ -29,6 +31,7 @@
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
Expand Down Expand Up @@ -119,10 +122,16 @@ public <T> FilterPredicate predicate(BoundPredicate<T> pred) {
BoundReference<T> ref = (BoundReference<T>) pred.term();
String path = schema.idToAlias(ref.fieldId());
Literal<T> lit;
Set<T> litSet;
if (pred.isUnaryPredicate()) {
lit = null;
litSet = null;
} else if (pred.isLiteralPredicate()) {
lit = pred.asLiteralPredicate().literal();
litSet = null;
} else if (pred.isSetPredicate()) {
lit = null;
litSet = pred.asSetPredicate().literalSet();
} else {
throw new UnsupportedOperationException("Cannot convert to Parquet filter: " + pred);
}
Expand All @@ -139,21 +148,41 @@ public <T> FilterPredicate predicate(BoundPredicate<T> pred) {
break;
case INTEGER:
case DATE:
return pred(op, FilterApi.intColumn(path), getParquetPrimitive(lit));
return pred(
op,
FilterApi.intColumn(path),
getParquetPrimitive(lit),
getParquetPrimitiveSet(litSet));
case LONG:
case TIME:
case TIMESTAMP:
return pred(op, FilterApi.longColumn(path), getParquetPrimitive(lit));
return pred(
op,
FilterApi.longColumn(path),
getParquetPrimitive(lit),
getParquetPrimitiveSet(litSet));
case FLOAT:
return pred(op, FilterApi.floatColumn(path), getParquetPrimitive(lit));
return pred(
op,
FilterApi.floatColumn(path),
getParquetPrimitive(lit),
getParquetPrimitiveSet(litSet));
case DOUBLE:
return pred(op, FilterApi.doubleColumn(path), getParquetPrimitive(lit));
return pred(
op,
FilterApi.doubleColumn(path),
getParquetPrimitive(lit),
getParquetPrimitiveSet(litSet));
case STRING:
case UUID:
case FIXED:
case BINARY:
case DECIMAL:
return pred(op, FilterApi.binaryColumn(path), getParquetPrimitive(lit));
return pred(
op,
FilterApi.binaryColumn(path),
getParquetPrimitive(lit),
getParquetPrimitiveSet(litSet));
}

throw new UnsupportedOperationException("Cannot convert to Parquet filter: " + pred);
Expand All @@ -175,7 +204,7 @@ public <T> FilterPredicate predicate(UnboundPredicate<T> pred) {

@SuppressWarnings("checkstyle:MethodTypeParameterName")
private static <C extends Comparable<C>, COL extends Operators.Column<C> & Operators.SupportsLtGt>
FilterPredicate pred(Operation op, COL col, C value) {
FilterPredicate pred(Operation op, COL col, C value, Set<C> valueSet) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this how this is handled in Spark? I may be in the minority here but I would prefer two different pred functions, one which uses Set and the other using C. Then only calling the one which is appropriate based on usage rather than having both arguments and using different args based on the predicate type.

Copy link
Contributor Author

@jshmchenxi jshmchenxi Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark handles predicate pushdown in a more verbose way, basically:

predicate match {
    case OP1:
        parquetType match {
            case TYPE1: FilterApi.op1()
            case TYPE2: FilterApi.op1()
            ...
       }
    case OP2:
        parquetType match {
            case TYPE1: FilterApi.op2()
            case TYPE2: FilterApi.op2()
            ...
       }
    ...
}

While Iceberg handles this more compactly. It already handles unary predicate and literal predicate in the same pred function. I wouldn't mind adding a new pred function, but it seems we already use different args (no arg or C) based on predicate type in pred.

switch (op) {
case IS_NULL:
return FilterApi.eq(col, null);
Expand Down Expand Up @@ -209,6 +238,10 @@ FilterPredicate pred(Operation op, COL col, C value) {
return FilterApi.lt(col, value);
case LT_EQ:
return FilterApi.ltEq(col, value);
case IN:
return FilterApi.in(col, valueSet);
case NOT_IN:
return FilterApi.notIn(col, valueSet);
default:
throw new UnsupportedOperationException("Unsupported predicate operation: " + op);
}
Expand All @@ -223,7 +256,7 @@ private static <C extends Comparable<C>> C getParquetPrimitive(Literal<?> lit) {
// TODO: this needs to convert to handle BigDecimal and UUID
Object value = lit.value();
if (value instanceof Number) {
return (C) lit.value();
return (C) value;
} else if (value instanceof CharSequence) {
return (C) Binary.fromString(value.toString());
} else if (value instanceof ByteBuffer) {
Expand All @@ -233,6 +266,29 @@ private static <C extends Comparable<C>> C getParquetPrimitive(Literal<?> lit) {
"Type not supported yet: " + value.getClass().getName());
}

@SuppressWarnings("unchecked")
private static <C extends Comparable<C>> Set<C> getParquetPrimitiveSet(Set<?> litSet) {
if (litSet == null) {
return Collections.emptySet();
}

// TODO: this needs to convert to handle BigDecimal and UUID
Set<C> convertedSet = Sets.newHashSet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we add everything from litSet to convertedSet, should we initialize it with the size parameter as Sets.newHashSet(litSet); It would be more efficient.

for (Object value : litSet) {
if (value instanceof Number) {
convertedSet.add((C) value);
} else if (value instanceof CharSequence) {
convertedSet.add((C) Binary.fromString(value.toString()));
} else if (value instanceof ByteBuffer) {
convertedSet.add((C) Binary.fromReusedByteBuffer((ByteBuffer) value));
} else {
throw new UnsupportedOperationException(
"Type not supported yet: " + value.getClass().getName());
}
}
return convertedSet;
}

private static class AlwaysTrue implements FilterPredicate {
static final AlwaysTrue INSTANCE = new AlwaysTrue();

Expand Down