Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ public static class ReadBuilder {
private ReadSupport<?> readSupport = null;
private Function<MessageType, ParquetValueReader<?>> readerFunc = null;
private boolean filterRecords = true;
private boolean caseSensitive = true;
private Map<String, String> properties = Maps.newHashMap();
private boolean callInit = false;
private boolean reuseContainers = false;
Expand All @@ -283,6 +284,15 @@ public ReadBuilder project(Schema schema) {
return this;
}

public ReadBuilder caseInsensitive() {
return caseSensitive(false);
}

public ReadBuilder caseSensitive(boolean caseSensitive) {
this.caseSensitive = caseSensitive;
return this;
}

public ReadBuilder filterRecords(boolean filterRecords) {
this.filterRecords = filterRecords;
return this;
Expand Down Expand Up @@ -339,7 +349,7 @@ public <D> CloseableIterable<D> build() {
ParquetReadOptions options = optionsBuilder.build();

return new org.apache.iceberg.parquet.ParquetReader<>(
file, schema, options, readerFunc, filter, reuseContainers);
file, schema, options, readerFunc, filter, reuseContainers, caseSensitive);
}

ParquetReadBuilder<D> builder = new ParquetReadBuilder<>(ParquetIO.file(file));
Expand Down Expand Up @@ -374,7 +384,7 @@ public <D> CloseableIterable<D> build() {
builder.useStatsFilter()
.useDictionaryFilter()
.useRecordFilter(filterRecords)
.withFilter(ParquetFilters.convert(fileSchema, filter));
.withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive));
} else {
// turn off filtering
builder.useStatsFilter(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,13 @@ private EvalVisitor visitor() {
}

public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound) {
this(schema, unbound, true);
}

public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound, boolean caseSensitive) {
this.schema = schema;
this.struct = schema.asStruct();
this.expr = Binder.bind(struct, rewriteNot(unbound), true);
this.expr = Binder.bind(struct, rewriteNot(unbound), caseSensitive);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.types.Types;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
Expand All @@ -40,19 +39,8 @@

class ParquetFilters {

static FilterCompat.Filter convert(Schema schema, Expression expr) {
FilterPredicate pred = visit(expr, new ConvertFilterToParquet(schema));
// TODO: handle AlwaysFalse.INSTANCE
if (pred != null && pred != AlwaysTrue.INSTANCE) {
// FilterCompat will apply LogicalInverseRewriter
return FilterCompat.get(pred);
} else {
return FilterCompat.NOOP;
}
}

static FilterCompat.Filter convertColumnFilter(Schema schema, String column, Expression expr) {
FilterPredicate pred = visit(expr, new ConvertColumnFilterToParquet(schema, column));
static FilterCompat.Filter convert(Schema schema, Expression expr, boolean caseSensitive) {
FilterPredicate pred = visit(expr, new ConvertFilterToParquet(schema, caseSensitive));
// TODO: handle AlwaysFalse.INSTANCE
if (pred != null && pred != AlwaysTrue.INSTANCE) {
// FilterCompat will apply LogicalInverseRewriter
Expand All @@ -64,9 +52,11 @@ static FilterCompat.Filter convertColumnFilter(Schema schema, String column, Exp

private static class ConvertFilterToParquet extends ExpressionVisitor<FilterPredicate> {
private final Schema schema;
private final boolean caseSensitive;

private ConvertFilterToParquet(Schema schema) {
private ConvertFilterToParquet(Schema schema, boolean caseSensitive) {
this.schema = schema;
this.caseSensitive = caseSensitive;
}

@Override
Expand Down Expand Up @@ -160,7 +150,7 @@ public <T> FilterPredicate predicate(BoundPredicate<T> pred) {
}

protected Expression bind(UnboundPredicate<?> pred) {
return pred.bind(schema.asStruct(), true);
return pred.bind(schema.asStruct(), caseSensitive);
}

@Override
Expand All @@ -178,21 +168,6 @@ public <T> FilterPredicate predicate(UnboundPredicate<T> pred) {
}
}

private static class ConvertColumnFilterToParquet extends ConvertFilterToParquet {
private final Types.StructType partitionStruct;

private ConvertColumnFilterToParquet(Schema schema, String column) {
super(schema);
this.partitionStruct = schema.findField(column).type().asNestedType().asStructType();
}

@Override
protected Expression bind(UnboundPredicate<?> pred) {
// instead of binding the predicate using the top-level schema, bind it to the partition data
return pred.bind(partitionStruct, true);
}
}

private static
<C extends Comparable<C>, COL extends Operators.Column<C> & Operators.SupportsLtGt>
FilterPredicate pred(Operation op, COL col, C value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,13 @@ private MetricsEvalVisitor visitor() {
}

public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound) {
this(schema, unbound, true);
}

public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound, boolean caseSensitive) {
this.schema = schema;
this.struct = schema.asStruct();
this.expr = Binder.bind(struct, rewriteNot(unbound), true);
this.expr = Binder.bind(struct, rewriteNot(unbound), caseSensitive);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,19 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
private final Function<MessageType, ParquetValueReader<?>> readerFunc;
private final Expression filter;
private final boolean reuseContainers;
private final boolean caseSensitive;

public ParquetReader(InputFile input, Schema expectedSchema, ParquetReadOptions options,
Function<MessageType, ParquetValueReader<?>> readerFunc,
Expression filter, boolean reuseContainers) {
Expression filter, boolean reuseContainers, boolean caseSensitive) {
this.input = input;
this.expectedSchema = expectedSchema;
this.options = options;
this.readerFunc = readerFunc;
// replace alwaysTrue with null to avoid extra work evaluating a trivial filter
this.filter = filter == Expressions.alwaysTrue() ? null : filter;
this.reuseContainers = reuseContainers;
this.caseSensitive = caseSensitive;
}

private static class ReadConf<T> {
Expand All @@ -75,7 +77,8 @@ private static class ReadConf<T> {

@SuppressWarnings("unchecked")
ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
Function<MessageType, ParquetValueReader<?>> readerFunc, boolean reuseContainers) {
Function<MessageType, ParquetValueReader<?>> readerFunc, boolean reuseContainers,
boolean caseSensitive) {
this.file = file;
this.options = options;
this.reader = newReader(file, options);
Expand All @@ -95,8 +98,8 @@ private static class ReadConf<T> {
ParquetMetricsRowGroupFilter statsFilter = null;
ParquetDictionaryRowGroupFilter dictFilter = null;
if (filter != null) {
statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter);
dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter);
statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
}

long totalValues = 0L;
Expand Down Expand Up @@ -172,7 +175,7 @@ private static ParquetFileReader newReader(InputFile file, ParquetReadOptions op
private ReadConf<T> init() {
if (conf == null) {
ReadConf<T> conf = new ReadConf<>(
input, options, expectedSchema, filter, readerFunc, reuseContainers);
input, options, expectedSchema, filter, readerFunc, reuseContainers, caseSensitive);
this.conf = conf.copy();
return conf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,4 +470,10 @@ public void testStringNotEq() {
Assert.assertFalse("Should skip: contains only ''", shouldRead);
}

@Test
public void testCaseInsensitive() {
boolean shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, notEqual("no_Nulls", ""), false)
.shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
Assert.assertFalse("Should skip: contains only ''", shouldRead);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,4 +460,11 @@ public void testIntegerNotEqRewritten() {
.shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
Assert.assertTrue("Should read: id above upper bound", shouldRead);
}

@Test
public void testCaseInsensitive() {
boolean shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, equal("ID", 5), false)
.shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
Assert.assertFalse("Should not read: id below lower bound", shouldRead);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ private CloseableIterable<InternalRow> newParquetIterable(InputFile location,
.split(task.start(), task.length())
.createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema))
.filter(task.residual())
.caseSensitive(caseSensitive)
.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,12 @@
import org.apache.avro.generic.GenericData.Record;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetAvroValueReaders;
import org.apache.iceberg.parquet.ParquetReader;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.types.Types;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.schema.MessageType;
import org.junit.Assert;
import org.junit.Ignore;
Expand Down Expand Up @@ -90,6 +87,7 @@ public void testStructSchema() throws IOException {
);

File testFile = writeTestData(structSchema, 5_000_000, 1059);
// RandomData uses the root record name "test", which must match for records to be equal
MessageType readSchema = ParquetSchemaUtil.convert(structSchema, "test");

long sum = 0;
Expand All @@ -101,10 +99,11 @@ public void testStructSchema() throws IOException {
// clean up as much memory as possible to avoid a large GC during the timed run
System.gc();

try (ParquetReader<Record> reader = new ParquetReader<>(
Files.localInput(testFile), structSchema, ParquetReadOptions.builder().build(),
fileSchema -> ParquetAvroValueReaders.buildReader(structSchema, readSchema),
Expressions.alwaysTrue(), true)) {
try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
.project(structSchema)
.createReaderFunc(
fileSchema -> ParquetAvroValueReaders.buildReader(structSchema, readSchema))
.build()) {
long start = System.currentTimeMillis();
long val = 0;
long count = 0;
Expand Down Expand Up @@ -136,6 +135,7 @@ public void testStructSchema() throws IOException {
@Ignore
public void testWithOldReadPath() throws IOException {
File testFile = writeTestData(COMPLEX_SCHEMA, 500_000, 1985);
// RandomData uses the root record name "test", which must match for records to be equal
MessageType readSchema = ParquetSchemaUtil.convert(COMPLEX_SCHEMA, "test");

for (int i = 0; i < 5; i += 1) {
Expand All @@ -162,10 +162,11 @@ public void testWithOldReadPath() throws IOException {
// clean up as much memory as possible to avoid a large GC during the timed run
System.gc();

try (ParquetReader<Record> reader = new ParquetReader<>(
Files.localInput(testFile), COMPLEX_SCHEMA, ParquetReadOptions.builder().build(),
fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema),
Expressions.alwaysTrue(), true)) {
try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
.project(COMPLEX_SCHEMA)
.createReaderFunc(
fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema))
.build()) {
long start = System.currentTimeMillis();
long val = 0;
long count = 0;
Expand Down Expand Up @@ -195,13 +196,16 @@ public void testCorrectness() throws IOException {
writer.addAll(records);
}

// RandomData uses the root record name "test", which must match for records to be equal
MessageType readSchema = ParquetSchemaUtil.convert(COMPLEX_SCHEMA, "test");

// verify that the new read path is correct
try (ParquetReader<Record> reader = new ParquetReader<>(
Files.localInput(testFile), COMPLEX_SCHEMA, ParquetReadOptions.builder().build(),
fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema),
Expressions.alwaysTrue(), true)) {
try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
.project(COMPLEX_SCHEMA)
.createReaderFunc(
fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema))
.reuseContainers()
.build()) {
int i = 0;
Iterator<Record> iter = records.iterator();
for (Record actual : reader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@
import org.apache.avro.generic.GenericData.Record;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetAvroValueReaders;
import org.apache.iceberg.parquet.ParquetAvroWriter;
import org.apache.iceberg.parquet.ParquetReader;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.types.Types;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.schema.MessageType;
import org.junit.Assert;
import org.junit.Rule;
Expand Down Expand Up @@ -87,13 +85,15 @@ public void testCorrectness() throws IOException {
writer.addAll(records);
}

// RandomData uses the root record name "test", which must match for records to be equal
MessageType readSchema = ParquetSchemaUtil.convert(COMPLEX_SCHEMA, "test");

// verify that the new read path is correct
try (ParquetReader<Record> reader = new ParquetReader<>(
Files.localInput(testFile), COMPLEX_SCHEMA, ParquetReadOptions.builder().build(),
fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema),
Expressions.alwaysTrue(), false)) {
try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
.project(COMPLEX_SCHEMA)
.createReaderFunc(
fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema))
.build()) {
int i = 0;
Iterator<Record> iter = records.iterator();
for (Record actual : reader) {
Expand Down