diff --git a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java index b82d6eb663d5..f03544dc0e3f 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java +++ b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java @@ -812,6 +812,15 @@ public void testInLimitParquet() { Assert.assertTrue("Should read if IN is not evaluated", shouldRead); } + @Test + public void testParquetTypePromotion() { + Assume.assumeTrue("Only valid for Parquet", format == FileFormat.PARQUET); + Schema promotedSchema = new Schema(required(1, "id", Types.LongType.get())); + boolean shouldRead = new ParquetMetricsRowGroupFilter(promotedSchema, equal("id", INT_MIN_VALUE + 1), true) + .shouldRead(parquetSchema, rowGroupMetadata); + Assert.assertTrue("Should succeed with promoted schema", shouldRead); + } + private boolean shouldRead(Expression expression) { return shouldRead(expression, true); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java index 431c636e4086..78b3a31cebfd 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java @@ -68,6 +68,21 @@ static Literal fromParquetPrimitive(Type type, PrimitiveType parquetType, } } + static Function converterFromParquet(PrimitiveType parquetType, Type icebergType) { + Function fromParquet = converterFromParquet(parquetType); + if (icebergType != null) { + if (icebergType.typeId() == Type.TypeID.LONG && + parquetType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32) { + return value -> ((Integer) fromParquet.apply(value)).longValue(); + } else if (icebergType.typeId() == Type.TypeID.DOUBLE && + parquetType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FLOAT) { + return value -> ((Float) fromParquet.apply(value)).doubleValue(); + } + } + + return fromParquet; + } + static Function converterFromParquet(PrimitiveType type) { if (type.getOriginalType() != null) { switch (type.getOriginalType()) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java index d72cf49ee503..37c7d6e513f5 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java @@ -37,6 +37,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.NaNUtil; import org.apache.parquet.column.ColumnDescriptor; @@ -49,6 +50,7 @@ import org.apache.parquet.schema.PrimitiveType; public class ParquetDictionaryRowGroupFilter { + private final Schema schema; private final Expression expr; public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound) { @@ -56,6 +58,7 @@ public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound) { } public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound, boolean caseSensitive) { + this.schema = schema; StructType struct = schema.asStruct(); this.expr = Binder.bind(struct, Expressions.rewriteNot(unbound), caseSensitive); } @@ -96,8 +99,9 @@ private boolean eval(MessageType fileSchema, BlockMetaData rowGroup, PrimitiveType colType = fileSchema.getType(desc.getPath()).asPrimitiveType(); if (colType.getId() != null) { int id = colType.getId().intValue(); + Type icebergType = schema.findType(id); cols.put(id, desc); - conversions.put(id, ParquetConversions.converterFromParquet(colType)); + conversions.put(id, ParquetConversions.converterFromParquet(colType, icebergType)); } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java index ee026cc38100..f83d70100443 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java @@ -77,7 +77,7 @@ public boolean shouldRead(MessageType fileSchema, BlockMetaData rowGroup) { private static final boolean ROWS_CANNOT_MATCH = false; private class MetricsEvalVisitor extends BoundExpressionVisitor { - private Map stats = null; + private Map> stats = null; private Map valueCounts = null; private Map> conversions = null; @@ -93,9 +93,10 @@ private boolean eval(MessageType fileSchema, BlockMetaData rowGroup) { PrimitiveType colType = fileSchema.getType(col.getPath().toArray()).asPrimitiveType(); if (colType.getId() != null) { int id = colType.getId().intValue(); + Type icebergType = schema.findType(id); stats.put(id, col.getStatistics()); valueCounts.put(id, col.getValueCount()); - conversions.put(id, ParquetConversions.converterFromParquet(colType)); + conversions.put(id, ParquetConversions.converterFromParquet(colType, icebergType)); } } @@ -502,4 +503,19 @@ static boolean hasNonNullButNoMinMax(Statistics statistics, long valueCount) { return statistics.getNumNulls() < valueCount && (statistics.getMaxBytes() == null || statistics.getMinBytes() == null); } + + private static Function converterFor(PrimitiveType parquetType, Type icebergType) { + Function fromParquet = ParquetConversions.converterFromParquet(parquetType); + if (icebergType != null) { + if (icebergType.typeId() == Type.TypeID.LONG && + parquetType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32) { + return value -> ((Integer) fromParquet.apply(value)).longValue(); + } else if (icebergType.typeId() == Type.TypeID.DOUBLE && + parquetType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FLOAT) { + return value -> ((Float) fromParquet.apply(value)).doubleValue(); + } + } + + return fromParquet; + } } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java index a5e7a353093d..c02bf271f749 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java @@ -884,4 +884,12 @@ public void testIntegerNotIn() { .shouldRead(parquetSchema, rowGroupMetadata, dictionaryStore); Assert.assertFalse("Should not read: notIn on no nulls column (empty string is within the set)", shouldRead); } + + @Test + public void testTypePromotion() { + Schema promotedSchema = new Schema(required(1, "id", LongType.get())); + boolean shouldRead = new ParquetDictionaryRowGroupFilter(promotedSchema, equal("id", INT_MIN_VALUE + 1), true) + .shouldRead(parquetSchema, rowGroupMetadata, dictionaryStore); + Assert.assertTrue("Should succeed with promoted schema", shouldRead); + } }