From d3581f82daecdbf1384fd450d1cb982640263302 Mon Sep 17 00:00:00 2001 From: Xi Chen Date: Wed, 10 Sep 2025 10:58:05 +0800 Subject: [PATCH] Parquet: Support `in` predicate pushdown for ParquetFilters --- .../iceberg/parquet/ParquetFilters.java | 70 +++++++++++++++++-- 1 file changed, 63 insertions(+), 7 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java index fc6febe19438..bc869fe797bc 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java @@ -19,6 +19,8 @@ package org.apache.iceberg.parquet; import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Set; import org.apache.iceberg.Schema; import org.apache.iceberg.expressions.BoundPredicate; import org.apache.iceberg.expressions.BoundReference; @@ -29,6 +31,7 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.predicate.FilterApi; import org.apache.parquet.filter2.predicate.FilterPredicate; @@ -119,10 +122,16 @@ public FilterPredicate predicate(BoundPredicate pred) { BoundReference ref = (BoundReference) pred.term(); String path = schema.idToAlias(ref.fieldId()); Literal lit; + Set litSet; if (pred.isUnaryPredicate()) { lit = null; + litSet = null; } else if (pred.isLiteralPredicate()) { lit = pred.asLiteralPredicate().literal(); + litSet = null; + } else if (pred.isSetPredicate()) { + lit = null; + litSet = pred.asSetPredicate().literalSet(); } else { throw new UnsupportedOperationException("Cannot convert to Parquet filter: " + pred); } @@ -139,21 +148,41 @@ public FilterPredicate predicate(BoundPredicate pred) { break; case INTEGER: case DATE: - return pred(op, FilterApi.intColumn(path), getParquetPrimitive(lit)); + return pred( + op, + FilterApi.intColumn(path), + getParquetPrimitive(lit), + getParquetPrimitiveSet(litSet)); case LONG: case TIME: case TIMESTAMP: - return pred(op, FilterApi.longColumn(path), getParquetPrimitive(lit)); + return pred( + op, + FilterApi.longColumn(path), + getParquetPrimitive(lit), + getParquetPrimitiveSet(litSet)); case FLOAT: - return pred(op, FilterApi.floatColumn(path), getParquetPrimitive(lit)); + return pred( + op, + FilterApi.floatColumn(path), + getParquetPrimitive(lit), + getParquetPrimitiveSet(litSet)); case DOUBLE: - return pred(op, FilterApi.doubleColumn(path), getParquetPrimitive(lit)); + return pred( + op, + FilterApi.doubleColumn(path), + getParquetPrimitive(lit), + getParquetPrimitiveSet(litSet)); case STRING: case UUID: case FIXED: case BINARY: case DECIMAL: - return pred(op, FilterApi.binaryColumn(path), getParquetPrimitive(lit)); + return pred( + op, + FilterApi.binaryColumn(path), + getParquetPrimitive(lit), + getParquetPrimitiveSet(litSet)); } throw new UnsupportedOperationException("Cannot convert to Parquet filter: " + pred); @@ -175,7 +204,7 @@ public FilterPredicate predicate(UnboundPredicate pred) { @SuppressWarnings("checkstyle:MethodTypeParameterName") private static , COL extends Operators.Column & Operators.SupportsLtGt> - FilterPredicate pred(Operation op, COL col, C value) { + FilterPredicate pred(Operation op, COL col, C value, Set valueSet) { switch (op) { case IS_NULL: return FilterApi.eq(col, null); @@ -209,6 +238,10 @@ FilterPredicate pred(Operation op, COL col, C value) { return FilterApi.lt(col, value); case LT_EQ: return FilterApi.ltEq(col, value); + case IN: + return FilterApi.in(col, valueSet); + case NOT_IN: + return FilterApi.notIn(col, valueSet); default: throw new UnsupportedOperationException("Unsupported predicate operation: " + op); } @@ -223,7 +256,7 @@ private static > C getParquetPrimitive(Literal lit) { // TODO: this needs to convert to handle BigDecimal and UUID Object value = lit.value(); if (value instanceof Number) { - return (C) lit.value(); + return (C) value; } else if (value instanceof CharSequence) { return (C) Binary.fromString(value.toString()); } else if (value instanceof ByteBuffer) { @@ -233,6 +266,29 @@ private static > C getParquetPrimitive(Literal lit) { "Type not supported yet: " + value.getClass().getName()); } + @SuppressWarnings("unchecked") + private static > Set getParquetPrimitiveSet(Set litSet) { + if (litSet == null) { + return Collections.emptySet(); + } + + // TODO: this needs to convert to handle BigDecimal and UUID + Set convertedSet = Sets.newHashSet(); + for (Object value : litSet) { + if (value instanceof Number) { + convertedSet.add((C) value); + } else if (value instanceof CharSequence) { + convertedSet.add((C) Binary.fromString(value.toString())); + } else if (value instanceof ByteBuffer) { + convertedSet.add((C) Binary.fromReusedByteBuffer((ByteBuffer) value)); + } else { + throw new UnsupportedOperationException( + "Type not supported yet: " + value.getClass().getName()); + } + } + return convertedSet; + } + private static class AlwaysTrue implements FilterPredicate { static final AlwaysTrue INSTANCE = new AlwaysTrue();