From cd70cac279d3f14ba61f0143f9988d4cc9413651 Mon Sep 17 00:00:00 2001 From: Xinli shang Date: Thu, 8 Oct 2020 18:09:41 -0700 Subject: [PATCH 1/2] Parquet: Support Page Skipping in Iceberg Parquet Reader --- .../org/apache/iceberg/parquet/Parquet.java | 7 ++ .../iceberg/parquet/ParquetFilters.java | 69 +++++++++++- .../apache/iceberg/parquet/ParquetReader.java | 25 ++++- .../org/apache/iceberg/parquet/ReadConf.java | 4 + .../parquet/VectorizedParquetReader.java | 26 ++++- .../parquet/TestParquetColumnIndex.java | 105 ++++++++++++++++++ 6 files changed, 228 insertions(+), 8 deletions(-) create mode 100644 parquet/src/test/java/org/apache/iceberg/parquet/TestParquetColumnIndex.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index fc35adc63f18..c506defff47c 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -582,6 +582,13 @@ public CloseableIterable build() { optionsBuilder = ParquetReadOptions.builder(); } + if (filter != null && + schema.getAliases() != null && + ParquetFilters.isSupportedFilter(filter, schema, caseSensitive)) { + optionsBuilder.useRecordFilter(filterRecords); + optionsBuilder.withRecordFilter(ParquetFilters.convert(schema, filter, caseSensitive)); + } + for (Map.Entry entry : properties.entrySet()) { optionsBuilder.set(entry.getKey(), entry.getValue()); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java index d3e3df3a86d5..ee6309460228 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java @@ -20,7 +20,9 @@ package org.apache.iceberg.parquet; import java.nio.ByteBuffer; +import java.util.Set; import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.And; import org.apache.iceberg.expressions.BoundPredicate; import org.apache.iceberg.expressions.BoundReference; import org.apache.iceberg.expressions.Expression; @@ -29,7 +31,10 @@ import org.apache.iceberg.expressions.ExpressionVisitors.ExpressionVisitor; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.expressions.Not; import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.types.Type; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.predicate.FilterApi; import org.apache.parquet.filter2.predicate.FilterPredicate; @@ -38,9 +43,71 @@ class ParquetFilters { + private static final Set SUPPORTED_OPS = ImmutableSet.of( + Operation.IS_NULL, + Operation.NOT_NULL, + Operation.EQ, + Operation.NOT_EQ, + Operation.GT, + Operation.GT_EQ, + Operation.LT, + Operation.LT_EQ); + + private static final Set SUPPORTED_TYPES = ImmutableSet.of( + Type.TypeID.BOOLEAN, + Type.TypeID.INTEGER, + Type.TypeID.LONG, + Type.TypeID.FLOAT, + Type.TypeID.DOUBLE, + Type.TypeID.DATE, + Type.TypeID.TIME + ); + private ParquetFilters() { } + public static boolean isSupportedFilter(Expression expr, Schema schema, boolean caseSensitive) { + if (expr.op().equals(Operation.AND)) { + return isSupportedFilter(((And) expr).left(), schema, caseSensitive) && + isSupportedFilter(((And) expr).right(), schema, caseSensitive); + } else if (expr.op().equals(Operation.OR)) { + return isSupportedFilter(((And) expr).left(), schema, caseSensitive) && + isSupportedFilter(((And) expr).right(), schema, caseSensitive); + } else if (expr.op().equals(Operation.NOT)) { + return isSupportedFilter(((Not) expr).child(), schema, caseSensitive); + } else { + return isSupportedOp(expr) && isSupportedType(expr, schema, caseSensitive); + } + } + + private static boolean isSupportedOp(Expression expr) { + return SUPPORTED_OPS.contains(expr.op()); + } + + private static boolean isSupportedType(Expression expr, Schema schema, boolean caseSensitive) { + if (expr instanceof BoundPredicate) { + return checkBounded((BoundPredicate) expr); + } else if (expr instanceof UnboundPredicate) { + Expression bound = ((UnboundPredicate) expr).bind(schema.asStruct(), caseSensitive); + if (bound instanceof BoundPredicate) { + return checkBounded((BoundPredicate) bound); + } else if (bound == Expressions.alwaysTrue() || (bound == Expressions.alwaysFalse())) { + return true; + } + } + return false; + } + + private static boolean checkBounded(BoundPredicate pred) { + if (pred.term() instanceof BoundReference) { + BoundReference ref = (BoundReference) pred.term(); + if (SUPPORTED_TYPES.contains(ref.type().typeId())) { + return true; + } + } + return false; + } + static FilterCompat.Filter convert(Schema schema, Expression expr, boolean caseSensitive) { FilterPredicate pred = ExpressionVisitors.visit(expr, new ConvertFilterToParquet(schema, caseSensitive)); // TODO: handle AlwaysFalse.INSTANCE @@ -231,7 +298,7 @@ private static class AlwaysFalse implements FilterPredicate { @Override public R accept(Visitor visitor) { - throw new UnsupportedOperationException("AlwaysTrue is a placeholder only"); + throw new UnsupportedOperationException("AlwaysFalse is a placeholder only"); } } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java index 29e980def260..3e33e51bac68 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java @@ -20,6 +20,7 @@ package org.apache.iceberg.parquet; import java.io.IOException; +import java.util.List; import java.util.function.Function; import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.RuntimeIOException; @@ -30,9 +31,11 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.schema.MessageType; public class ParquetReader extends CloseableGroup implements CloseableIterable { @@ -86,11 +89,14 @@ private static class FileIterator implements CloseableIterator { private final long totalValues; private final boolean reuseContainers; private final long[] rowGroupsStartRowPos; + private final boolean hasRecordFilter; private int nextRowGroup = 0; private long nextRowGroupStart = 0; private long valuesRead = 0; private T last = null; + private List blocks; + private long skippedValues; FileIterator(ReadConf conf) { this.reader = conf.reader(); @@ -99,11 +105,14 @@ private static class FileIterator implements CloseableIterator { this.totalValues = conf.totalValues(); this.reuseContainers = conf.reuseContainers(); this.rowGroupsStartRowPos = conf.startRowPositions(); + this.blocks = reader.getRowGroups(); + this.skippedValues = 0; + this.hasRecordFilter = conf.hasRecordFilter(); } @Override public boolean hasNext() { - return valuesRead < totalValues; + return valuesRead + skippedValues < totalValues; } @Override @@ -130,13 +139,23 @@ private void advance() { PageReadStore pages; try { - pages = reader.readNextRowGroup(); + // Because of the issue of PARQUET-1901, we cannot blindly call readNextFilteredRowGroup() + if (hasRecordFilter) { + pages = reader.readNextFilteredRowGroup(); + } else { + pages = reader.readNextRowGroup(); + } } catch (IOException e) { throw new RuntimeIOException(e); } + long blockRowCount = blocks.get(nextRowGroup).getRowCount(); + Preconditions.checkState(blockRowCount >= pages.getRowCount(), + "Number of values in the block, %s, does not great or equal number of values after filtering, %s", + blockRowCount, pages.getRowCount()); long rowPosition = rowGroupsStartRowPos[nextRowGroup]; - nextRowGroupStart += pages.getRowCount(); + nextRowGroupStart += blockRowCount; + skippedValues += blockRowCount - pages.getRowCount(); nextRowGroup += 1; model.setPageSource(pages, rowPosition); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java index cb816da8ce0e..1b9c8037262e 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java @@ -208,6 +208,10 @@ ReadConf copy() { return new ReadConf<>(this); } + boolean hasRecordFilter() { + return options.getRecordFilter() != null; + } + private static ParquetFileReader newReader(InputFile file, ParquetReadOptions options) { try { return ParquetFileReader.open(ParquetIO.file(file), options); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java index 481012cb8bbe..ad4556b3447d 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java @@ -33,9 +33,11 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.schema.MessageType; @@ -95,10 +97,13 @@ private static class FileIterator implements CloseableIterator { private final int batchSize; private final List> columnChunkMetadata; private final boolean reuseContainers; + private final boolean hasRecordFilter; private int nextRowGroup = 0; private long nextRowGroupStart = 0; private long valuesRead = 0; private T last = null; + private List blocks; + private long skippedValues; FileIterator(ReadConf conf) { this.reader = conf.reader(); @@ -109,12 +114,14 @@ private static class FileIterator implements CloseableIterator { this.batchSize = conf.batchSize(); this.model.setBatchSize(this.batchSize); this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups(); + this.blocks = reader.getRowGroups(); + this.skippedValues = 0; + this.hasRecordFilter = conf.hasRecordFilter(); } - @Override public boolean hasNext() { - return valuesRead < totalValues; + return valuesRead + skippedValues < totalValues; } @Override @@ -145,12 +152,23 @@ private void advance() { } PageReadStore pages; try { - pages = reader.readNextRowGroup(); + // Because of the issue of PARQUET-1901, we cannot blindly call readNextFilteredRowGroup() + if (hasRecordFilter) { + pages = reader.readNextFilteredRowGroup(); + } else { + pages = reader.readNextRowGroup(); + } } catch (IOException e) { throw new RuntimeIOException(e); } + + long blockRowCount = blocks.get(nextRowGroup).getRowCount(); + Preconditions.checkState(blockRowCount >= pages.getRowCount(), + "Number of values in the block, %s, does not great or equal number of values after filtering, %s", + blockRowCount, pages.getRowCount()); model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup)); - nextRowGroupStart += pages.getRowCount(); + nextRowGroupStart += blockRowCount; + skippedValues += blockRowCount - pages.getRowCount(); nextRowGroup += 1; } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetColumnIndex.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetColumnIndex.java new file mode 100644 index 000000000000..c7684b57e0b2 --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetColumnIndex.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.parquet; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.parquet.Parquet.ReadBuilder; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.Files.localInput; +import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES; +import static org.apache.iceberg.parquet.ParquetWritingTestUtils.writeRecords; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; + +public class TestParquetColumnIndex { + + private MessageType parquetSchema = new MessageType("schema", + new PrimitiveType(REQUIRED, INT32, "intCol")); + private Schema schema = ParquetSchemaUtil.convert(parquetSchema); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testColumnIndexFilter() throws IOException { + File parquetFile = generateFileWithMultiplePages(ParquetAvroWriter::buildWriter); + int totalCount = getPageRecordCount(parquetFile, null); + int filterCount = getPageRecordCount(parquetFile, + Expressions.and(Expressions.notNull("intCol"), Expressions.equal("intCol", 1))); + Assert.assertTrue(filterCount < totalCount); + } + + private int getPageRecordCount(File parquetFile, Expression expr) { + List records = new ArrayList<>(); + ReadBuilder builder = Parquet.read(localInput(parquetFile)) + .project(schema) + .filterRecords(true) + .filter(expr) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)); + if (expr != null) { + builder.filterRecords(true).filter(expr); + } + CloseableIterator iter = builder.build().iterator(); + while (iter.hasNext()) { + records.add(iter.next()); + } + return records.size(); + } + + private File generateFileWithMultiplePages(Function> createWriterFunc) + throws IOException { + + int recordNum = 1000000; + List records = new ArrayList<>(recordNum); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + for (int i = 1; i <= recordNum; i++) { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("intCol", i); + records.add(record); + } + + // We make it 1000 pages, so that we can skip some + return writeRecords(temp, + schema, + ImmutableMap.of( + PARQUET_PAGE_SIZE_BYTES, + Integer.toString((recordNum / 1000) * Integer.BYTES)), + createWriterFunc, + records.toArray(new GenericData.Record[] {})); + } +} From e004fe25f1c396c55f512dfbbfaecdd985da525a Mon Sep 17 00:00:00 2001 From: Xinli shang Date: Fri, 16 Oct 2020 16:30:34 -0700 Subject: [PATCH 2/2] Address feedback; Add tests; Use Parquet filter instead of reimplement --- .../org/apache/iceberg/parquet/Parquet.java | 27 +++---- .../iceberg/parquet/ParquetFilters.java | 49 ++----------- .../apache/iceberg/parquet/ParquetReader.java | 23 +----- .../org/apache/iceberg/parquet/ReadConf.java | 50 +++++-------- .../parquet/VectorizedParquetReader.java | 21 +----- .../parquet/TestParquetColumnIndex.java | 72 ++++++++++++++++--- 6 files changed, 104 insertions(+), 138 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index c506defff47c..8febb7205cac 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -46,6 +46,7 @@ import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.io.CloseableIterable; @@ -582,11 +583,10 @@ public CloseableIterable build() { optionsBuilder = ParquetReadOptions.builder(); } - if (filter != null && - schema.getAliases() != null && - ParquetFilters.isSupportedFilter(filter, schema, caseSensitive)) { + if (filter != null && !filter.equals(Expressions.alwaysTrue()) && + ParquetFilters.isSupportedFilter(filter)) { optionsBuilder.useRecordFilter(filterRecords); - optionsBuilder.withRecordFilter(ParquetFilters.convert(schema, filter, caseSensitive)); + optionsBuilder.withRecordFilter(ParquetFilters.convert(getSchemaFromFile(), filter, caseSensitive)); } for (Map.Entry entry : properties.entrySet()) { @@ -630,17 +630,10 @@ public CloseableIterable build() { if (filter != null) { // TODO: should not need to get the schema to push down before opening the file. // Parquet should allow setting a filter inside its read support - MessageType type; - try (ParquetFileReader schemaReader = ParquetFileReader.open(ParquetIO.file(file))) { - type = schemaReader.getFileMetaData().getSchema(); - } catch (IOException e) { - throw new RuntimeIOException(e); - } - Schema fileSchema = ParquetSchemaUtil.convert(type); builder.useStatsFilter() .useDictionaryFilter() .useRecordFilter(filterRecords) - .withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive)); + .withFilter(ParquetFilters.convert(getSchemaFromFile(), filter, caseSensitive)); } else { // turn off filtering builder.useStatsFilter(false) @@ -662,6 +655,16 @@ public CloseableIterable build() { return new ParquetIterable<>(builder); } + + private Schema getSchemaFromFile() { + MessageType type; + try (ParquetFileReader schemaReader = ParquetFileReader.open(ParquetIO.file(file))) { + type = schemaReader.getFileMetaData().getSchema(); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + return ParquetSchemaUtil.convert(type); + } } private static class ParquetReadBuilder extends ParquetReader.Builder { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java index ee6309460228..0ebfdba64ed6 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java @@ -34,7 +34,6 @@ import org.apache.iceberg.expressions.Not; import org.apache.iceberg.expressions.UnboundPredicate; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; -import org.apache.iceberg.types.Type; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.predicate.FilterApi; import org.apache.parquet.filter2.predicate.FilterPredicate; @@ -53,30 +52,20 @@ class ParquetFilters { Operation.LT, Operation.LT_EQ); - private static final Set SUPPORTED_TYPES = ImmutableSet.of( - Type.TypeID.BOOLEAN, - Type.TypeID.INTEGER, - Type.TypeID.LONG, - Type.TypeID.FLOAT, - Type.TypeID.DOUBLE, - Type.TypeID.DATE, - Type.TypeID.TIME - ); - private ParquetFilters() { } - public static boolean isSupportedFilter(Expression expr, Schema schema, boolean caseSensitive) { + public static boolean isSupportedFilter(Expression expr) { if (expr.op().equals(Operation.AND)) { - return isSupportedFilter(((And) expr).left(), schema, caseSensitive) && - isSupportedFilter(((And) expr).right(), schema, caseSensitive); + return isSupportedFilter(((And) expr).left()) && + isSupportedFilter(((And) expr).right()); } else if (expr.op().equals(Operation.OR)) { - return isSupportedFilter(((And) expr).left(), schema, caseSensitive) && - isSupportedFilter(((And) expr).right(), schema, caseSensitive); + return isSupportedFilter(((And) expr).left()) && + isSupportedFilter(((And) expr).right()); } else if (expr.op().equals(Operation.NOT)) { - return isSupportedFilter(((Not) expr).child(), schema, caseSensitive); + return isSupportedFilter(((Not) expr).child()); } else { - return isSupportedOp(expr) && isSupportedType(expr, schema, caseSensitive); + return isSupportedOp(expr); } } @@ -84,30 +73,6 @@ private static boolean isSupportedOp(Expression expr) { return SUPPORTED_OPS.contains(expr.op()); } - private static boolean isSupportedType(Expression expr, Schema schema, boolean caseSensitive) { - if (expr instanceof BoundPredicate) { - return checkBounded((BoundPredicate) expr); - } else if (expr instanceof UnboundPredicate) { - Expression bound = ((UnboundPredicate) expr).bind(schema.asStruct(), caseSensitive); - if (bound instanceof BoundPredicate) { - return checkBounded((BoundPredicate) bound); - } else if (bound == Expressions.alwaysTrue() || (bound == Expressions.alwaysFalse())) { - return true; - } - } - return false; - } - - private static boolean checkBounded(BoundPredicate pred) { - if (pred.term() instanceof BoundReference) { - BoundReference ref = (BoundReference) pred.term(); - if (SUPPORTED_TYPES.contains(ref.type().typeId())) { - return true; - } - } - return false; - } - static FilterCompat.Filter convert(Schema schema, Expression expr, boolean caseSensitive) { FilterPredicate pred = ExpressionVisitors.visit(expr, new ConvertFilterToParquet(schema, caseSensitive)); // TODO: handle AlwaysFalse.INSTANCE diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java index 3e33e51bac68..8f910cabbda9 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java @@ -20,7 +20,6 @@ package org.apache.iceberg.parquet; import java.io.IOException; -import java.util.List; import java.util.function.Function; import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.RuntimeIOException; @@ -31,11 +30,9 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMapping; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.schema.MessageType; public class ParquetReader extends CloseableGroup implements CloseableIterable { @@ -84,7 +81,6 @@ public CloseableIterator iterator() { private static class FileIterator implements CloseableIterator { private final ParquetFileReader reader; - private final boolean[] shouldSkip; private final ParquetValueReader model; private final long totalValues; private final boolean reuseContainers; @@ -95,24 +91,19 @@ private static class FileIterator implements CloseableIterator { private long nextRowGroupStart = 0; private long valuesRead = 0; private T last = null; - private List blocks; - private long skippedValues; FileIterator(ReadConf conf) { this.reader = conf.reader(); - this.shouldSkip = conf.shouldSkip(); this.model = conf.model(); this.totalValues = conf.totalValues(); this.reuseContainers = conf.reuseContainers(); this.rowGroupsStartRowPos = conf.startRowPositions(); - this.blocks = reader.getRowGroups(); - this.skippedValues = 0; this.hasRecordFilter = conf.hasRecordFilter(); } @Override public boolean hasNext() { - return valuesRead + skippedValues < totalValues; + return valuesRead < totalValues; } @Override @@ -132,11 +123,6 @@ public T next() { } private void advance() { - while (shouldSkip[nextRowGroup]) { - nextRowGroup += 1; - reader.skipNextRowGroup(); - } - PageReadStore pages; try { // Because of the issue of PARQUET-1901, we cannot blindly call readNextFilteredRowGroup() @@ -149,13 +135,8 @@ private void advance() { throw new RuntimeIOException(e); } - long blockRowCount = blocks.get(nextRowGroup).getRowCount(); - Preconditions.checkState(blockRowCount >= pages.getRowCount(), - "Number of values in the block, %s, does not great or equal number of values after filtering, %s", - blockRowCount, pages.getRowCount()); long rowPosition = rowGroupsStartRowPos[nextRowGroup]; - nextRowGroupStart += blockRowCount; - skippedValues += blockRowCount - pages.getRowCount(); + nextRowGroupStart += pages.getRowCount(); nextRowGroup += 1; model.setPageSource(pages, rowPosition); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java index 1b9c8037262e..36212c8f645c 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java @@ -54,7 +54,6 @@ class ReadConf { private final ParquetValueReader model; private final VectorizedReader vectorizedModel; private final List rowGroups; - private final boolean[] shouldSkip; private final long totalValues; private final boolean reuseContainers; private final Integer batchSize; @@ -85,34 +84,28 @@ class ReadConf { this.projection = ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema); } + // ParquetFileReader has filters(stats, dictionary and future bloomfilter) in the constructor, + // so getRowGroups returns filtered row groups this.rowGroups = reader.getRowGroups(); - this.shouldSkip = new boolean[rowGroups.size()]; // Fetch all row groups starting positions to compute the row offsets of the filtered row groups Map offsetToStartPos = generateOffsetToStartPos(); this.startRowPositions = new long[rowGroups.size()]; - ParquetMetricsRowGroupFilter statsFilter = null; - ParquetDictionaryRowGroupFilter dictFilter = null; - if (filter != null) { - statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive); - dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive); - } - long computedTotalValues = 0L; - for (int i = 0; i < shouldSkip.length; i += 1) { + // If a row group has 0 counts after filtering, it won't be added to rowGroups + for (int i = 0; i < rowGroups.size(); i += 1) { BlockMetaData rowGroup = rowGroups.get(i); startRowPositions[i] = offsetToStartPos.get(rowGroup.getStartingPos()); - boolean shouldRead = filter == null || ( - statsFilter.shouldRead(typeWithIds, rowGroup) && - dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup))); - this.shouldSkip[i] = !shouldRead; - if (shouldRead) { - computedTotalValues += rowGroup.getRowCount(); - } + computedTotalValues += rowGroup.getRowCount(); } - this.totalValues = computedTotalValues; + // Because of the issue of PARQUET-1901, we cannot blindly call getFilteredRecordCount() + if (filter != null) { + this.totalValues = reader.getFilteredRecordCount(); + } else { + this.totalValues = computedTotalValues; + } if (readerFunc != null) { this.model = (ParquetValueReader) readerFunc.apply(typeWithIds); this.vectorizedModel = null; @@ -134,7 +127,6 @@ private ReadConf(ReadConf toCopy) { this.projection = toCopy.projection; this.model = toCopy.model; this.rowGroups = toCopy.rowGroups; - this.shouldSkip = toCopy.shouldSkip; this.totalValues = toCopy.totalValues; this.reuseContainers = toCopy.reuseContainers; this.batchSize = toCopy.batchSize; @@ -162,10 +154,6 @@ VectorizedReader vectorizedModel() { return vectorizedModel; } - boolean[] shouldSkip() { - return shouldSkip; - } - private Map generateOffsetToStartPos() { try (ParquetFileReader fileReader = newReader(file, ParquetReadOptions.builder().build())) { Map offsetToStartPos = new HashMap<>(); @@ -225,16 +213,12 @@ private List> getColumnChunkMetadataForRowG .map(columnDescriptor -> ColumnPath.get(columnDescriptor.getPath())).collect(Collectors.toSet()); ImmutableList.Builder> listBuilder = ImmutableList.builder(); for (int i = 0; i < rowGroups.size(); i++) { - if (!shouldSkip[i]) { - BlockMetaData blockMetaData = rowGroups.get(i); - ImmutableMap.Builder mapBuilder = ImmutableMap.builder(); - blockMetaData.getColumns().stream() - .filter(columnChunkMetaData -> projectedColumns.contains(columnChunkMetaData.getPath())) - .forEach(columnChunkMetaData -> mapBuilder.put(columnChunkMetaData.getPath(), columnChunkMetaData)); - listBuilder.add(mapBuilder.build()); - } else { - listBuilder.add(ImmutableMap.of()); - } + BlockMetaData blockMetaData = rowGroups.get(i); + ImmutableMap.Builder mapBuilder = ImmutableMap.builder(); + blockMetaData.getColumns().stream() + .filter(columnChunkMetaData -> projectedColumns.contains(columnChunkMetaData.getPath())) + .forEach(columnChunkMetaData -> mapBuilder.put(columnChunkMetaData.getPath(), columnChunkMetaData)); + listBuilder.add(mapBuilder.build()); } return listBuilder.build(); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java index ad4556b3447d..5f5390bbe1d7 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java @@ -33,11 +33,9 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMapping; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.schema.MessageType; @@ -91,7 +89,6 @@ public CloseableIterator iterator() { private static class FileIterator implements CloseableIterator { private final ParquetFileReader reader; - private final boolean[] shouldSkip; private final VectorizedReader model; private final long totalValues; private final int batchSize; @@ -102,26 +99,21 @@ private static class FileIterator implements CloseableIterator { private long nextRowGroupStart = 0; private long valuesRead = 0; private T last = null; - private List blocks; - private long skippedValues; FileIterator(ReadConf conf) { this.reader = conf.reader(); - this.shouldSkip = conf.shouldSkip(); this.totalValues = conf.totalValues(); this.reuseContainers = conf.reuseContainers(); this.model = conf.vectorizedModel(); this.batchSize = conf.batchSize(); this.model.setBatchSize(this.batchSize); this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups(); - this.blocks = reader.getRowGroups(); - this.skippedValues = 0; this.hasRecordFilter = conf.hasRecordFilter(); } @Override public boolean hasNext() { - return valuesRead + skippedValues < totalValues; + return valuesRead < totalValues; } @Override @@ -146,10 +138,6 @@ public T next() { } private void advance() { - while (shouldSkip[nextRowGroup]) { - nextRowGroup += 1; - reader.skipNextRowGroup(); - } PageReadStore pages; try { // Because of the issue of PARQUET-1901, we cannot blindly call readNextFilteredRowGroup() @@ -162,13 +150,8 @@ private void advance() { throw new RuntimeIOException(e); } - long blockRowCount = blocks.get(nextRowGroup).getRowCount(); - Preconditions.checkState(blockRowCount >= pages.getRowCount(), - "Number of values in the block, %s, does not great or equal number of values after filtering, %s", - blockRowCount, pages.getRowCount()); model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup)); - nextRowGroupStart += blockRowCount; - skippedValues += blockRowCount - pages.getRowCount(); + nextRowGroupStart += pages.getRowCount(); nextRowGroup += 1; } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetColumnIndex.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetColumnIndex.java index c7684b57e0b2..850b75ecccec 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetColumnIndex.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetColumnIndex.java @@ -42,26 +42,44 @@ import static org.apache.iceberg.Files.localInput; import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; import static org.apache.iceberg.parquet.ParquetWritingTestUtils.writeRecords; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; public class TestParquetColumnIndex { + private final int recordName = 1000000; + private final int recordPerPage = 100; + private final int recordPerRowGroup = 100000; + private final int lookupVal = 519530; + private MessageType parquetSchema = new MessageType("schema", new PrimitiveType(REQUIRED, INT32, "intCol")); private Schema schema = ParquetSchemaUtil.convert(parquetSchema); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); @Rule public TemporaryFolder temp = new TemporaryFolder(); @Test public void testColumnIndexFilter() throws IOException { - File parquetFile = generateFileWithMultiplePages(ParquetAvroWriter::buildWriter); + File parquetFile = generateFileWithSeq(ParquetAvroWriter::buildWriter); + readAndCompare(parquetFile, recordPerPage); + } + + @Test + public void testColumnIndexFilterWithHole() throws IOException { + File parquetFile = generateFileWithSeqAndHole(ParquetAvroWriter::buildWriter); + readAndCompare(parquetFile, 0); + } + + private void readAndCompare(File parquetFile, int expectedCount) { int totalCount = getPageRecordCount(parquetFile, null); int filterCount = getPageRecordCount(parquetFile, - Expressions.and(Expressions.notNull("intCol"), Expressions.equal("intCol", 1))); + Expressions.and(Expressions.notNull("intCol"), Expressions.equal("intCol", lookupVal))); Assert.assertTrue(filterCount < totalCount); + Assert.assertTrue(filterCount == expectedCount); } private int getPageRecordCount(File parquetFile, Expression expr) { @@ -81,24 +99,56 @@ private int getPageRecordCount(File parquetFile, Expression expr) { return records.size(); } - private File generateFileWithMultiplePages(Function> createWriterFunc) - throws IOException { + private File generateFileWithSeq(Function> createWriterFunc) + throws IOException { + List seq = generateSequences(); + List records = convertToRecords(seq); + return write(records, createWriterFunc); + } + + private File generateFileWithSeqAndHole(Function> createWriterFunc) + throws IOException { + List seq = generateSequencesWithHoles(); + List records = convertToRecords(seq); + return write(records, createWriterFunc); + } - int recordNum = 1000000; - List records = new ArrayList<>(recordNum); - org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); - for (int i = 1; i <= recordNum; i++) { + private List convertToRecords(List seq) { + List records = new ArrayList<>(recordName); + for (int num : seq) { GenericData.Record record = new GenericData.Record(avroSchema); - record.put("intCol", i); + record.put("intCol", num); records.add(record); } + return records; + } + + private List generateSequences() { + List res = new ArrayList<>(); + for (int i = 1; i <= recordName; i++) { + res.add(i); + } + return res; + } + + private List generateSequencesWithHoles() { + List res = new ArrayList<>(); + for (int i = 1; i <= 3 * recordName; i++) { + res.add(i * 3); + } + return res; + } - // We make it 1000 pages, so that we can skip some + private File write(List records, + Function> createWriterFunc) throws IOException { + // We make it multiple row groups and pages, so that we can skip some pages return writeRecords(temp, schema, ImmutableMap.of( PARQUET_PAGE_SIZE_BYTES, - Integer.toString((recordNum / 1000) * Integer.BYTES)), + Integer.toString(recordPerPage * Integer.BYTES), + PARQUET_ROW_GROUP_SIZE_BYTES, + Integer.toString(recordPerRowGroup * Integer.BYTES)), createWriterFunc, records.toArray(new GenericData.Record[] {})); }