diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 1cb50d379c04..22c5f1809e9d 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -257,6 +257,7 @@ public static class ReadBuilder { private ReadSupport readSupport = null; private Function> readerFunc = null; private boolean filterRecords = true; + private boolean caseSensitive = true; private Map properties = Maps.newHashMap(); private boolean callInit = false; private boolean reuseContainers = false; @@ -283,6 +284,15 @@ public ReadBuilder project(Schema schema) { return this; } + public ReadBuilder caseInsensitive() { + return caseSensitive(false); + } + + public ReadBuilder caseSensitive(boolean caseSensitive) { + this.caseSensitive = caseSensitive; + return this; + } + public ReadBuilder filterRecords(boolean filterRecords) { this.filterRecords = filterRecords; return this; @@ -339,7 +349,7 @@ public CloseableIterable build() { ParquetReadOptions options = optionsBuilder.build(); return new org.apache.iceberg.parquet.ParquetReader<>( - file, schema, options, readerFunc, filter, reuseContainers); + file, schema, options, readerFunc, filter, reuseContainers, caseSensitive); } ParquetReadBuilder builder = new ParquetReadBuilder<>(ParquetIO.file(file)); @@ -374,7 +384,7 @@ public CloseableIterable build() { builder.useStatsFilter() .useDictionaryFilter() .useRecordFilter(filterRecords) - .withFilter(ParquetFilters.convert(fileSchema, filter)); + .withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive)); } else { // turn off filtering builder.useStatsFilter(false) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java index 57f728ce39ff..58fe9021ecc0 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java @@ -66,9 +66,13 @@ private EvalVisitor visitor() { } public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound) { + this(schema, unbound, true); + } + + public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound, boolean caseSensitive) { this.schema = schema; this.struct = schema.asStruct(); - this.expr = Binder.bind(struct, rewriteNot(unbound), true); + this.expr = Binder.bind(struct, rewriteNot(unbound), caseSensitive); } /** diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java index 84bfe2a8b4da..b4a675e04a3d 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java @@ -29,7 +29,6 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.expressions.UnboundPredicate; -import org.apache.iceberg.types.Types; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.predicate.FilterApi; import org.apache.parquet.filter2.predicate.FilterPredicate; @@ -40,19 +39,8 @@ class ParquetFilters { - static FilterCompat.Filter convert(Schema schema, Expression expr) { - FilterPredicate pred = visit(expr, new ConvertFilterToParquet(schema)); - // TODO: handle AlwaysFalse.INSTANCE - if (pred != null && pred != AlwaysTrue.INSTANCE) { - // FilterCompat will apply LogicalInverseRewriter - return FilterCompat.get(pred); - } else { - return FilterCompat.NOOP; - } - } - - static FilterCompat.Filter convertColumnFilter(Schema schema, String column, Expression expr) { - FilterPredicate pred = visit(expr, new ConvertColumnFilterToParquet(schema, column)); + static FilterCompat.Filter convert(Schema schema, Expression expr, boolean caseSensitive) { + FilterPredicate pred = visit(expr, new ConvertFilterToParquet(schema, caseSensitive)); // TODO: handle AlwaysFalse.INSTANCE if (pred != null && pred != AlwaysTrue.INSTANCE) { // FilterCompat will apply LogicalInverseRewriter @@ -64,9 +52,11 @@ static FilterCompat.Filter convertColumnFilter(Schema schema, String column, Exp private static class ConvertFilterToParquet extends ExpressionVisitor { private final Schema schema; + private final boolean caseSensitive; - private ConvertFilterToParquet(Schema schema) { + private ConvertFilterToParquet(Schema schema, boolean caseSensitive) { this.schema = schema; + this.caseSensitive = caseSensitive; } @Override @@ -160,7 +150,7 @@ public FilterPredicate predicate(BoundPredicate pred) { } protected Expression bind(UnboundPredicate pred) { - return pred.bind(schema.asStruct(), true); + return pred.bind(schema.asStruct(), caseSensitive); } @Override @@ -178,21 +168,6 @@ public FilterPredicate predicate(UnboundPredicate pred) { } } - private static class ConvertColumnFilterToParquet extends ConvertFilterToParquet { - private final Types.StructType partitionStruct; - - private ConvertColumnFilterToParquet(Schema schema, String column) { - super(schema); - this.partitionStruct = schema.findField(column).type().asNestedType().asStructType(); - } - - @Override - protected Expression bind(UnboundPredicate pred) { - // instead of binding the predicate using the top-level schema, bind it to the partition data - return pred.bind(partitionStruct, true); - } - } - private static , COL extends Operators.Column & Operators.SupportsLtGt> FilterPredicate pred(Operation op, COL col, C value) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java index 9b66ed812835..4fe93dd291f7 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java @@ -56,9 +56,13 @@ private MetricsEvalVisitor visitor() { } public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound) { + this(schema, unbound, true); + } + + public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound, boolean caseSensitive) { this.schema = schema; this.struct = schema.asStruct(); - this.expr = Binder.bind(struct, rewriteNot(unbound), true); + this.expr = Binder.bind(struct, rewriteNot(unbound), caseSensitive); } /** diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java index 2dd930be84f2..653bb490dd61 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java @@ -49,10 +49,11 @@ public class ParquetReader extends CloseableGroup implements CloseableIterabl private final Function> readerFunc; private final Expression filter; private final boolean reuseContainers; + private final boolean caseSensitive; public ParquetReader(InputFile input, Schema expectedSchema, ParquetReadOptions options, Function> readerFunc, - Expression filter, boolean reuseContainers) { + Expression filter, boolean reuseContainers, boolean caseSensitive) { this.input = input; this.expectedSchema = expectedSchema; this.options = options; @@ -60,6 +61,7 @@ public ParquetReader(InputFile input, Schema expectedSchema, ParquetReadOptions // replace alwaysTrue with null to avoid extra work evaluating a trivial filter this.filter = filter == Expressions.alwaysTrue() ? null : filter; this.reuseContainers = reuseContainers; + this.caseSensitive = caseSensitive; } private static class ReadConf { @@ -75,7 +77,8 @@ private static class ReadConf { @SuppressWarnings("unchecked") ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter, - Function> readerFunc, boolean reuseContainers) { + Function> readerFunc, boolean reuseContainers, + boolean caseSensitive) { this.file = file; this.options = options; this.reader = newReader(file, options); @@ -95,8 +98,8 @@ private static class ReadConf { ParquetMetricsRowGroupFilter statsFilter = null; ParquetDictionaryRowGroupFilter dictFilter = null; if (filter != null) { - statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter); - dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter); + statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive); + dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive); } long totalValues = 0L; @@ -172,7 +175,7 @@ private static ParquetFileReader newReader(InputFile file, ParquetReadOptions op private ReadConf init() { if (conf == null) { ReadConf conf = new ReadConf<>( - input, options, expectedSchema, filter, readerFunc, reuseContainers); + input, options, expectedSchema, filter, readerFunc, reuseContainers, caseSensitive); this.conf = conf.copy(); return conf; } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java index 1523ac9a7301..20c455317fe8 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java @@ -470,4 +470,10 @@ public void testStringNotEq() { Assert.assertFalse("Should skip: contains only ''", shouldRead); } + @Test + public void testCaseInsensitive() { + boolean shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, notEqual("no_Nulls", ""), false) + .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE); + Assert.assertFalse("Should skip: contains only ''", shouldRead); + } } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestMetricsRowGroupFilter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestMetricsRowGroupFilter.java index 0a346668888e..41a091c62da9 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestMetricsRowGroupFilter.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestMetricsRowGroupFilter.java @@ -460,4 +460,11 @@ public void testIntegerNotEqRewritten() { .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA); Assert.assertTrue("Should read: id above upper bound", shouldRead); } + + @Test + public void testCaseInsensitive() { + boolean shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, equal("ID", 5), false) + .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA); + Assert.assertFalse("Should not read: id below lower bound", shouldRead); + } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java index 0fde726b6ba5..0770552deac5 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -461,6 +461,7 @@ private CloseableIterable newParquetIterable(InputFile location, .split(task.start(), task.length()) .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema)) .filter(task.residual()) + .caseSensitive(caseSensitive) .build(); } } diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java index d2648e2fbd68..de98a4bbb310 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java @@ -25,15 +25,12 @@ import org.apache.avro.generic.GenericData.Record; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; -import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.parquet.ParquetAvroValueReaders; -import org.apache.iceberg.parquet.ParquetReader; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.types.Types; -import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.schema.MessageType; import org.junit.Assert; import org.junit.Ignore; @@ -90,6 +87,7 @@ public void testStructSchema() throws IOException { ); File testFile = writeTestData(structSchema, 5_000_000, 1059); + // RandomData uses the root record name "test", which must match for records to be equal MessageType readSchema = ParquetSchemaUtil.convert(structSchema, "test"); long sum = 0; @@ -101,10 +99,11 @@ public void testStructSchema() throws IOException { // clean up as much memory as possible to avoid a large GC during the timed run System.gc(); - try (ParquetReader reader = new ParquetReader<>( - Files.localInput(testFile), structSchema, ParquetReadOptions.builder().build(), - fileSchema -> ParquetAvroValueReaders.buildReader(structSchema, readSchema), - Expressions.alwaysTrue(), true)) { + try (CloseableIterable reader = Parquet.read(Files.localInput(testFile)) + .project(structSchema) + .createReaderFunc( + fileSchema -> ParquetAvroValueReaders.buildReader(structSchema, readSchema)) + .build()) { long start = System.currentTimeMillis(); long val = 0; long count = 0; @@ -136,6 +135,7 @@ public void testStructSchema() throws IOException { @Ignore public void testWithOldReadPath() throws IOException { File testFile = writeTestData(COMPLEX_SCHEMA, 500_000, 1985); + // RandomData uses the root record name "test", which must match for records to be equal MessageType readSchema = ParquetSchemaUtil.convert(COMPLEX_SCHEMA, "test"); for (int i = 0; i < 5; i += 1) { @@ -162,10 +162,11 @@ public void testWithOldReadPath() throws IOException { // clean up as much memory as possible to avoid a large GC during the timed run System.gc(); - try (ParquetReader reader = new ParquetReader<>( - Files.localInput(testFile), COMPLEX_SCHEMA, ParquetReadOptions.builder().build(), - fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema), - Expressions.alwaysTrue(), true)) { + try (CloseableIterable reader = Parquet.read(Files.localInput(testFile)) + .project(COMPLEX_SCHEMA) + .createReaderFunc( + fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema)) + .build()) { long start = System.currentTimeMillis(); long val = 0; long count = 0; @@ -195,13 +196,16 @@ public void testCorrectness() throws IOException { writer.addAll(records); } + // RandomData uses the root record name "test", which must match for records to be equal MessageType readSchema = ParquetSchemaUtil.convert(COMPLEX_SCHEMA, "test"); // verify that the new read path is correct - try (ParquetReader reader = new ParquetReader<>( - Files.localInput(testFile), COMPLEX_SCHEMA, ParquetReadOptions.builder().build(), - fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema), - Expressions.alwaysTrue(), true)) { + try (CloseableIterable reader = Parquet.read(Files.localInput(testFile)) + .project(COMPLEX_SCHEMA) + .createReaderFunc( + fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema)) + .reuseContainers() + .build()) { int i = 0; Iterator iter = records.iterator(); for (Record actual : reader) { diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java index 5562a1dc5709..411ed3259c2e 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java @@ -25,15 +25,13 @@ import org.apache.avro.generic.GenericData.Record; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; -import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.parquet.ParquetAvroValueReaders; import org.apache.iceberg.parquet.ParquetAvroWriter; -import org.apache.iceberg.parquet.ParquetReader; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.types.Types; -import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.schema.MessageType; import org.junit.Assert; import org.junit.Rule; @@ -87,13 +85,15 @@ public void testCorrectness() throws IOException { writer.addAll(records); } + // RandomData uses the root record name "test", which must match for records to be equal MessageType readSchema = ParquetSchemaUtil.convert(COMPLEX_SCHEMA, "test"); // verify that the new read path is correct - try (ParquetReader reader = new ParquetReader<>( - Files.localInput(testFile), COMPLEX_SCHEMA, ParquetReadOptions.builder().build(), - fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema), - Expressions.alwaysTrue(), false)) { + try (CloseableIterable reader = Parquet.read(Files.localInput(testFile)) + .project(COMPLEX_SCHEMA) + .createReaderFunc( + fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema)) + .build()) { int i = 0; Iterator iter = records.iterator(); for (Record actual : reader) {