diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index fc35adc63f18..8febb7205cac 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -46,6 +46,7 @@ import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.io.CloseableIterable; @@ -582,6 +583,12 @@ public CloseableIterable build() { optionsBuilder = ParquetReadOptions.builder(); } + if (filter != null && !filter.equals(Expressions.alwaysTrue()) && + ParquetFilters.isSupportedFilter(filter)) { + optionsBuilder.useRecordFilter(filterRecords); + optionsBuilder.withRecordFilter(ParquetFilters.convert(getSchemaFromFile(), filter, caseSensitive)); + } + for (Map.Entry entry : properties.entrySet()) { optionsBuilder.set(entry.getKey(), entry.getValue()); } @@ -623,17 +630,10 @@ public CloseableIterable build() { if (filter != null) { // TODO: should not need to get the schema to push down before opening the file. // Parquet should allow setting a filter inside its read support - MessageType type; - try (ParquetFileReader schemaReader = ParquetFileReader.open(ParquetIO.file(file))) { - type = schemaReader.getFileMetaData().getSchema(); - } catch (IOException e) { - throw new RuntimeIOException(e); - } - Schema fileSchema = ParquetSchemaUtil.convert(type); builder.useStatsFilter() .useDictionaryFilter() .useRecordFilter(filterRecords) - .withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive)); + .withFilter(ParquetFilters.convert(getSchemaFromFile(), filter, caseSensitive)); } else { // turn off filtering builder.useStatsFilter(false) @@ -655,6 +655,16 @@ public CloseableIterable build() { return new ParquetIterable<>(builder); } + + private Schema getSchemaFromFile() { + MessageType type; + try (ParquetFileReader schemaReader = ParquetFileReader.open(ParquetIO.file(file))) { + type = schemaReader.getFileMetaData().getSchema(); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + return ParquetSchemaUtil.convert(type); + } } private static class ParquetReadBuilder extends ParquetReader.Builder { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java index d3e3df3a86d5..0ebfdba64ed6 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java @@ -20,7 +20,9 @@ package org.apache.iceberg.parquet; import java.nio.ByteBuffer; +import java.util.Set; import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.And; import org.apache.iceberg.expressions.BoundPredicate; import org.apache.iceberg.expressions.BoundReference; import org.apache.iceberg.expressions.Expression; @@ -29,7 +31,9 @@ import org.apache.iceberg.expressions.ExpressionVisitors.ExpressionVisitor; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.expressions.Not; import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.predicate.FilterApi; import org.apache.parquet.filter2.predicate.FilterPredicate; @@ -38,9 +42,37 @@ class ParquetFilters { + private static final Set SUPPORTED_OPS = ImmutableSet.of( + Operation.IS_NULL, + Operation.NOT_NULL, + Operation.EQ, + Operation.NOT_EQ, + Operation.GT, + Operation.GT_EQ, + Operation.LT, + Operation.LT_EQ); + private ParquetFilters() { } + public static boolean isSupportedFilter(Expression expr) { + if (expr.op().equals(Operation.AND)) { + return isSupportedFilter(((And) expr).left()) && + isSupportedFilter(((And) expr).right()); + } else if (expr.op().equals(Operation.OR)) { + return isSupportedFilter(((And) expr).left()) && + isSupportedFilter(((And) expr).right()); + } else if (expr.op().equals(Operation.NOT)) { + return isSupportedFilter(((Not) expr).child()); + } else { + return isSupportedOp(expr); + } + } + + private static boolean isSupportedOp(Expression expr) { + return SUPPORTED_OPS.contains(expr.op()); + } + static FilterCompat.Filter convert(Schema schema, Expression expr, boolean caseSensitive) { FilterPredicate pred = ExpressionVisitors.visit(expr, new ConvertFilterToParquet(schema, caseSensitive)); // TODO: handle AlwaysFalse.INSTANCE @@ -231,7 +263,7 @@ private static class AlwaysFalse implements FilterPredicate { @Override public R accept(Visitor visitor) { - throw new UnsupportedOperationException("AlwaysTrue is a placeholder only"); + throw new UnsupportedOperationException("AlwaysFalse is a placeholder only"); } } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java index 29e980def260..8f910cabbda9 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java @@ -81,11 +81,11 @@ public CloseableIterator iterator() { private static class FileIterator implements CloseableIterator { private final ParquetFileReader reader; - private final boolean[] shouldSkip; private final ParquetValueReader model; private final long totalValues; private final boolean reuseContainers; private final long[] rowGroupsStartRowPos; + private final boolean hasRecordFilter; private int nextRowGroup = 0; private long nextRowGroupStart = 0; @@ -94,11 +94,11 @@ private static class FileIterator implements CloseableIterator { FileIterator(ReadConf conf) { this.reader = conf.reader(); - this.shouldSkip = conf.shouldSkip(); this.model = conf.model(); this.totalValues = conf.totalValues(); this.reuseContainers = conf.reuseContainers(); this.rowGroupsStartRowPos = conf.startRowPositions(); + this.hasRecordFilter = conf.hasRecordFilter(); } @Override @@ -123,14 +123,14 @@ public T next() { } private void advance() { - while (shouldSkip[nextRowGroup]) { - nextRowGroup += 1; - reader.skipNextRowGroup(); - } - PageReadStore pages; try { - pages = reader.readNextRowGroup(); + // Because of the issue of PARQUET-1901, we cannot blindly call readNextFilteredRowGroup() + if (hasRecordFilter) { + pages = reader.readNextFilteredRowGroup(); + } else { + pages = reader.readNextRowGroup(); + } } catch (IOException e) { throw new RuntimeIOException(e); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java index cb816da8ce0e..36212c8f645c 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java @@ -54,7 +54,6 @@ class ReadConf { private final ParquetValueReader model; private final VectorizedReader vectorizedModel; private final List rowGroups; - private final boolean[] shouldSkip; private final long totalValues; private final boolean reuseContainers; private final Integer batchSize; @@ -85,34 +84,28 @@ class ReadConf { this.projection = ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema); } + // ParquetFileReader has filters(stats, dictionary and future bloomfilter) in the constructor, + // so getRowGroups returns filtered row groups this.rowGroups = reader.getRowGroups(); - this.shouldSkip = new boolean[rowGroups.size()]; // Fetch all row groups starting positions to compute the row offsets of the filtered row groups Map offsetToStartPos = generateOffsetToStartPos(); this.startRowPositions = new long[rowGroups.size()]; - ParquetMetricsRowGroupFilter statsFilter = null; - ParquetDictionaryRowGroupFilter dictFilter = null; - if (filter != null) { - statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive); - dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive); - } - long computedTotalValues = 0L; - for (int i = 0; i < shouldSkip.length; i += 1) { + // If a row group has 0 counts after filtering, it won't be added to rowGroups + for (int i = 0; i < rowGroups.size(); i += 1) { BlockMetaData rowGroup = rowGroups.get(i); startRowPositions[i] = offsetToStartPos.get(rowGroup.getStartingPos()); - boolean shouldRead = filter == null || ( - statsFilter.shouldRead(typeWithIds, rowGroup) && - dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup))); - this.shouldSkip[i] = !shouldRead; - if (shouldRead) { - computedTotalValues += rowGroup.getRowCount(); - } + computedTotalValues += rowGroup.getRowCount(); } - this.totalValues = computedTotalValues; + // Because of the issue of PARQUET-1901, we cannot blindly call getFilteredRecordCount() + if (filter != null) { + this.totalValues = reader.getFilteredRecordCount(); + } else { + this.totalValues = computedTotalValues; + } if (readerFunc != null) { this.model = (ParquetValueReader) readerFunc.apply(typeWithIds); this.vectorizedModel = null; @@ -134,7 +127,6 @@ private ReadConf(ReadConf toCopy) { this.projection = toCopy.projection; this.model = toCopy.model; this.rowGroups = toCopy.rowGroups; - this.shouldSkip = toCopy.shouldSkip; this.totalValues = toCopy.totalValues; this.reuseContainers = toCopy.reuseContainers; this.batchSize = toCopy.batchSize; @@ -162,10 +154,6 @@ VectorizedReader vectorizedModel() { return vectorizedModel; } - boolean[] shouldSkip() { - return shouldSkip; - } - private Map generateOffsetToStartPos() { try (ParquetFileReader fileReader = newReader(file, ParquetReadOptions.builder().build())) { Map offsetToStartPos = new HashMap<>(); @@ -208,6 +196,10 @@ ReadConf copy() { return new ReadConf<>(this); } + boolean hasRecordFilter() { + return options.getRecordFilter() != null; + } + private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) { try { return ParquetFileReader.open(ParquetIO.file(file), options); @@ -221,16 +213,12 @@ private List> getColumnChunkMetadataForRowG .map(columnDescriptor -> ColumnPath.get(columnDescriptor.getPath())).collect(Collectors.toSet()); ImmutableList.Builder> listBuilder = ImmutableList.builder(); for (int i = 0; i < rowGroups.size(); i++) { - if (!shouldSkip[i]) { - BlockMetaData blockMetaData = rowGroups.get(i); - ImmutableMap.Builder mapBuilder = ImmutableMap.builder(); - blockMetaData.getColumns().stream() - .filter(columnChunkMetaData -> projectedColumns.contains(columnChunkMetaData.getPath())) - .forEach(columnChunkMetaData -> mapBuilder.put(columnChunkMetaData.getPath(), columnChunkMetaData)); - listBuilder.add(mapBuilder.build()); - } else { - listBuilder.add(ImmutableMap.of()); - } + BlockMetaData blockMetaData = rowGroups.get(i); + ImmutableMap.Builder mapBuilder = ImmutableMap.builder(); + blockMetaData.getColumns().stream() + .filter(columnChunkMetaData -> projectedColumns.contains(columnChunkMetaData.getPath())) + .forEach(columnChunkMetaData -> mapBuilder.put(columnChunkMetaData.getPath(), columnChunkMetaData)); + listBuilder.add(mapBuilder.build()); } return listBuilder.build(); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java index 481012cb8bbe..5f5390bbe1d7 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java @@ -89,12 +89,12 @@ public CloseableIterator iterator() { private static class FileIterator implements CloseableIterator { private final ParquetFileReader reader; - private final boolean[] shouldSkip; private final VectorizedReader model; private final long totalValues; private final int batchSize; private final List> columnChunkMetadata; private final boolean reuseContainers; + private final boolean hasRecordFilter; private int nextRowGroup = 0; private long nextRowGroupStart = 0; private long valuesRead = 0; @@ -102,16 +102,15 @@ private static class FileIterator implements CloseableIterator { FileIterator(ReadConf conf) { this.reader = conf.reader(); - this.shouldSkip = conf.shouldSkip(); this.totalValues = conf.totalValues(); this.reuseContainers = conf.reuseContainers(); this.model = conf.vectorizedModel(); this.batchSize = conf.batchSize(); this.model.setBatchSize(this.batchSize); this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups(); + this.hasRecordFilter = conf.hasRecordFilter(); } - @Override public boolean hasNext() { return valuesRead < totalValues; @@ -139,16 +138,18 @@ public T next() { } private void advance() { - while (shouldSkip[nextRowGroup]) { - nextRowGroup += 1; - reader.skipNextRowGroup(); - } PageReadStore pages; try { - pages = reader.readNextRowGroup(); + // Because of the issue of PARQUET-1901, we cannot blindly call readNextFilteredRowGroup() + if (hasRecordFilter) { + pages = reader.readNextFilteredRowGroup(); + } else { + pages = reader.readNextRowGroup(); + } } catch (IOException e) { throw new RuntimeIOException(e); } + model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup)); nextRowGroupStart += pages.getRowCount(); nextRowGroup += 1; diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetColumnIndex.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetColumnIndex.java new file mode 100644 index 000000000000..850b75ecccec --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetColumnIndex.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.parquet; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.parquet.Parquet.ReadBuilder; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.Files.localInput; +import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; +import static org.apache.iceberg.parquet.ParquetWritingTestUtils.writeRecords; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; + +public class TestParquetColumnIndex { + + private final int recordName = 1000000; + private final int recordPerPage = 100; + private final int recordPerRowGroup = 100000; + private final int lookupVal = 519530; + + private MessageType parquetSchema = new MessageType("schema", + new PrimitiveType(REQUIRED, INT32, "intCol")); + private Schema schema = ParquetSchemaUtil.convert(parquetSchema); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testColumnIndexFilter() throws IOException { + File parquetFile = generateFileWithSeq(ParquetAvroWriter::buildWriter); + readAndCompare(parquetFile, recordPerPage); + } + + @Test + public void testColumnIndexFilterWithHole() throws IOException { + File parquetFile = generateFileWithSeqAndHole(ParquetAvroWriter::buildWriter); + readAndCompare(parquetFile, 0); + } + + private void readAndCompare(File parquetFile, int expectedCount) { + int totalCount = getPageRecordCount(parquetFile, null); + int filterCount = getPageRecordCount(parquetFile, + Expressions.and(Expressions.notNull("intCol"), Expressions.equal("intCol", lookupVal))); + Assert.assertTrue(filterCount < totalCount); + Assert.assertTrue(filterCount == expectedCount); + } + + private int getPageRecordCount(File parquetFile, Expression expr) { + List records = new ArrayList<>(); + ReadBuilder builder = Parquet.read(localInput(parquetFile)) + .project(schema) + .filterRecords(true) + .filter(expr) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)); + if (expr != null) { + builder.filterRecords(true).filter(expr); + } + CloseableIterator iter = builder.build().iterator(); + while (iter.hasNext()) { + records.add(iter.next()); + } + return records.size(); + } + + private File generateFileWithSeq(Function> createWriterFunc) + throws IOException { + List seq = generateSequences(); + List records = convertToRecords(seq); + return write(records, createWriterFunc); + } + + private File generateFileWithSeqAndHole(Function> createWriterFunc) + throws IOException { + List seq = generateSequencesWithHoles(); + List records = convertToRecords(seq); + return write(records, createWriterFunc); + } + + private List convertToRecords(List seq) { + List records = new ArrayList<>(recordName); + for (int num : seq) { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("intCol", num); + records.add(record); + } + return records; + } + + private List generateSequences() { + List res = new ArrayList<>(); + for (int i = 1; i <= recordName; i++) { + res.add(i); + } + return res; + } + + private List generateSequencesWithHoles() { + List res = new ArrayList<>(); + for (int i = 1; i <= 3 * recordName; i++) { + res.add(i * 3); + } + return res; + } + + private File write(List records, + Function> createWriterFunc) throws IOException { + // We make it multiple row groups and pages, so that we can skip some pages + return writeRecords(temp, + schema, + ImmutableMap.of( + PARQUET_PAGE_SIZE_BYTES, + Integer.toString(recordPerPage * Integer.BYTES), + PARQUET_ROW_GROUP_SIZE_BYTES, + Integer.toString(recordPerRowGroup * Integer.BYTES)), + createWriterFunc, + records.toArray(new GenericData.Record[] {})); + } +}