diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java index 2175293ab2b6..daa116f292ed 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java @@ -42,9 +42,15 @@ protected BaseBatchReader(List> readers) { @Override public void setRowGroupInfo( PageReadStore pageStore, Map metaData, long rowPosition) { + setRowGroupInfo(pageStore, metaData); + } + + @Override + public void setRowGroupInfo( + PageReadStore pageStore, Map metaData) { for (VectorizedArrowReader reader : readers) { if (reader != null) { - reader.setRowGroupInfo(pageStore, metaData, rowPosition); + reader.setRowGroupInfo(pageStore, metaData); } } } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index 27ee25124f16..806e723eaeea 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -432,6 +432,11 @@ private void allocateVectorBasedOnTypeName(PrimitiveType primitive, Field arrowF @Override public void setRowGroupInfo( PageReadStore source, Map metadata, long rowPosition) { + setRowGroupInfo(source, metadata); + } + + @Override + public void setRowGroupInfo(PageReadStore source, Map metadata) { ColumnChunkMetaData chunkMetaData = metadata.get(ColumnPath.get(columnDescriptor.getPath())); this.dictionary = vectorizedColumnIterator.setRowGroupInfo( @@ -475,6 +480,10 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) { public void setRowGroupInfo( PageReadStore source, Map metadata, long rowPosition) {} + @Override + public void setRowGroupInfo( + PageReadStore source, Map metadata) {} + @Override public String toString() { return "NullReader"; @@ -541,7 +550,13 @@ private static NullabilityHolder newNullabilityHolder(int size) { @Override public void setRowGroupInfo( PageReadStore source, Map metadata, long rowPosition) { - this.rowStart = rowPosition; + setRowGroupInfo(source, metadata); + } + + @Override + public void setRowGroupInfo( + PageReadStore source, Map metadata) { + this.rowStart = source.getRowIndexOffset().orElse(0L); } @Override @@ -586,6 +601,10 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) { public void setRowGroupInfo( PageReadStore source, Map metadata, long rowPosition) {} + @Override + public void setRowGroupInfo( + PageReadStore source, Map metadata) {} + @Override public String toString() { return String.format("ConstantReader: %s", value); @@ -613,6 +632,10 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) { public void setRowGroupInfo( PageReadStore source, Map metadata, long rowPosition) {} + @Override + public void setRowGroupInfo( + PageReadStore source, Map metadata) {} + @Override public String toString() { return "DeletedVectorReader"; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/BaseColumnIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/BaseColumnIterator.java index 647397fad670..7094e0147798 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/BaseColumnIterator.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/BaseColumnIterator.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.parquet; +import java.util.Optional; +import java.util.PrimitiveIterator; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Dictionary; import org.apache.parquet.column.page.DataPage; @@ -26,6 +28,7 @@ @SuppressWarnings("checkstyle:VisibilityModifier") public abstract class BaseColumnIterator { protected final ColumnDescriptor desc; + protected final int definitionLevel; // state reset for each row group protected PageReader pageSource = null; @@ -34,15 +37,31 @@ public abstract class BaseColumnIterator { protected long advanceNextPageCount = 0L; protected Dictionary dictionary; + // state for page skipping + protected boolean needsSynchronizing; + protected PrimitiveIterator.OfLong rowIndexes; + protected long currentRowIndex; + protected int numValuesToSkip; + protected BaseColumnIterator(ColumnDescriptor descriptor) { this.desc = descriptor; + this.definitionLevel = desc.getMaxDefinitionLevel() - 1; } public void setPageSource(PageReader source) { + setPageSource(source, Optional.empty()); + } + + public void setPageSource( + PageReader source, Optional optionalRowIndexes) { this.pageSource = source; this.triplesCount = source.getTotalValueCount(); this.triplesRead = 0L; this.advanceNextPageCount = 0L; + this.needsSynchronizing = optionalRowIndexes.isPresent(); + if (needsSynchronizing) { + this.rowIndexes = optionalRowIndexes.get(); + } BasePageIterator pageIterator = pageIterator(); pageIterator.reset(); dictionary = ParquetUtil.readDictionary(desc, pageSource); @@ -60,6 +79,17 @@ protected void advance() { if (page != null) { pageIterator.setPage(page); this.advanceNextPageCount += pageIterator.currentPageCount(); + + if (needsSynchronizing) { + long firstRowIndex = + page.getFirstRowIndex() + .orElseThrow( + () -> + new IllegalStateException( + "Index of first row in page is not available")); + this.numValuesToSkip = 0; + this.currentRowIndex = firstRowIndex - 1; + } } else { return; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java index 6389177bff63..17eac92bd064 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java @@ -71,6 +71,10 @@ protected void reset() { this.hasNext = false; } + protected void skip(int numValuesToSkip) { + throw new UnsupportedOperationException(); + } + protected abstract void initDataReader( Encoding dataEncoding, ByteBufferInputStream in, int valueCount); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java index 1c0ea4829eb8..8f1c4e318cc6 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java @@ -18,7 +18,10 @@ */ package org.apache.iceberg.parquet; +import java.util.Optional; +import java.util.PrimitiveIterator; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.PageReader; import org.apache.parquet.io.api.Binary; public abstract class ColumnIterator extends BaseColumnIterator implements TripleIterator { @@ -89,6 +92,7 @@ public Binary next() { } private final PageIterator pageIterator; + private long targetRowIndex = Long.MIN_VALUE; private ColumnIterator(ColumnDescriptor desc, String writerVersion) { super(desc); @@ -160,4 +164,49 @@ public N nextNull() { protected BasePageIterator pageIterator() { return pageIterator; } + + @Override + public void setPageSource(PageReader source) { + setPageSource(source, Optional.empty()); + } + + @Override + public void setPageSource(PageReader source, Optional rowIndexes) { + super.setPageSource(source, rowIndexes); + if (rowIndexes.isPresent()) { + this.targetRowIndex = Long.MIN_VALUE; + } + } + + @Override + public boolean needsSynchronizing() { + return needsSynchronizing; + } + + @Override + public void synchronize() { + numValuesToSkip = 0; + while (hasNext()) { + advance(); + if (pageIterator.currentRepetitionLevel() == 0) { + currentRowIndex += 1; + if (currentRowIndex > targetRowIndex) { + targetRowIndex = rowIndexes.hasNext() ? rowIndexes.nextLong() : Long.MAX_VALUE; + } + } + + if (currentRowIndex < targetRowIndex) { + triplesRead += 1; + if (pageIterator.currentDefinitionLevel() > definitionLevel) { + numValuesToSkip += 1; + } + + pageIterator.advance(); + } else { + break; + } + } + + pageIterator.skip(numValuesToSkip); + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/PageIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/PageIterator.java index 34383352bf68..0a4ea450f2cd 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/PageIterator.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/PageIterator.java @@ -189,7 +189,7 @@ public V nextNull() { return null; } - private void advance() { + protected void advance() { if (triplesRead < triplesCount) { this.currentDL = definitionLevels.nextInt(); this.currentRL = repetitionLevels.nextInt(); @@ -202,6 +202,11 @@ private void advance() { } } + @Override + protected void skip(int numValuesToSkip) { + values.skip(numValuesToSkip); + } + RuntimeException handleRuntimeException(RuntimeException exception) { if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, valueEncoding) && exception instanceof ArrayIndexOutOfBoundsException) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index d591041d19c3..edbdf1626005 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -105,6 +105,7 @@ import org.apache.parquet.column.ParquetProperties.WriterVersion; import org.apache.parquet.crypto.FileDecryptionProperties; import org.apache.parquet.crypto.FileEncryptionProperties; +import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetOutputFormat; @@ -114,10 +115,14 @@ import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Parquet { private Parquet() {} + private static final Logger LOG = LoggerFactory.getLogger(Parquet.class); + private static final Collection READ_PROPERTIES_TO_REMOVE = Sets.newHashSet( "parquet.read.filter", @@ -1165,7 +1170,27 @@ public CloseableIterable build() { optionsBuilder.withDecryption(fileDecryptionProperties); } + // TODO: for now, apply filter only for non-vectorized read + if (filter != null && batchedReaderFunc == null) { + MessageType type = getSchemaFromFile(fileDecryptionProperties); + Schema fileSchema = ParquetSchemaUtil.convert(type); + try { + FilterCompat.Filter convertedFilter = + ParquetFilters.convert(type, fileSchema, filter, caseSensitive); + optionsBuilder.useRecordFilter(); + optionsBuilder.withRecordFilter(convertedFilter); + } catch (Exception e) { + // no record filter to use + optionsBuilder.useRecordFilter(false); + LOG.warn("ReadBuilder: filter conversion threw exception", e); + } + } + ParquetReadOptions options = optionsBuilder.build(); + LOG.info( + "ReadBuilder: options: useRecordFilter: {}, recordFilter: {}", + options.useRecordFilter(), + options.getRecordFilter()); NameMapping mapping; if (nameMapping != null) { @@ -1218,15 +1243,7 @@ public CloseableIterable build() { if (filter != null) { // TODO: should not need to get the schema to push down before opening the file. // Parquet should allow setting a filter inside its read support - ParquetReadOptions decryptOptions = - ParquetReadOptions.builder().withDecryption(fileDecryptionProperties).build(); - MessageType type; - try (ParquetFileReader schemaReader = - ParquetFileReader.open(ParquetIO.file(file), decryptOptions)) { - type = schemaReader.getFileMetaData().getSchema(); - } catch (IOException e) { - throw new RuntimeIOException(e); - } + MessageType type = getSchemaFromFile(fileDecryptionProperties); Schema fileSchema = ParquetSchemaUtil.convert(type); builder .useStatsFilter() @@ -1261,6 +1278,20 @@ public CloseableIterable build() { return new ParquetIterable<>(builder); } + + private MessageType getSchemaFromFile(FileDecryptionProperties fileDecryptionProperties) { + ParquetReadOptions decryptOptions = + ParquetReadOptions.builder().withDecryption(fileDecryptionProperties).build(); + MessageType type; + try (ParquetFileReader schemaReader = + ParquetFileReader.open(ParquetIO.file(file), decryptOptions)) { + type = schemaReader.getFileMetaData().getSchema(); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + + return type; + } } private static class ParquetReadBuilder extends ParquetReader.Builder { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java index fc6febe19438..bf58d48a1eee 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java @@ -29,19 +29,23 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.types.Type; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.predicate.FilterApi; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.filter2.predicate.Operators; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; class ParquetFilters { private ParquetFilters() {} - static FilterCompat.Filter convert(Schema schema, Expression expr, boolean caseSensitive) { + static FilterCompat.Filter convert( + MessageType mType, Schema schema, Expression expr, boolean caseSensitive) { FilterPredicate pred = - ExpressionVisitors.visit(expr, new ConvertFilterToParquet(schema, caseSensitive)); + ExpressionVisitors.visit(expr, new ConvertFilterToParquet(mType, schema, caseSensitive)); // TODO: handle AlwaysFalse.INSTANCE if (pred != null && pred != AlwaysTrue.INSTANCE) { // FilterCompat will apply LogicalInverseRewriter @@ -51,11 +55,17 @@ static FilterCompat.Filter convert(Schema schema, Expression expr, boolean caseS } } + static FilterCompat.Filter convert(Schema schema, Expression expr, boolean caseSensitive) { + return convert(null, schema, expr, caseSensitive); + } + private static class ConvertFilterToParquet extends ExpressionVisitor { + private final MessageType mType; private final Schema schema; private final boolean caseSensitive; - private ConvertFilterToParquet(Schema schema, boolean caseSensitive) { + private ConvertFilterToParquet(MessageType mType, Schema schema, boolean caseSensitive) { + this.mType = mType; this.schema = schema; this.caseSensitive = caseSensitive; } @@ -127,6 +137,27 @@ public FilterPredicate predicate(BoundPredicate pred) { throw new UnsupportedOperationException("Cannot convert to Parquet filter: " + pred); } + String errMsg = "Cannot convert to Parquet filter: " + pred; + if (mType != null) { + // We create a Parquet filter predicate and that predicate uses a Parquet column. + // We need to ensure that the Parquet column type converted from the Iceberg type of + // the column matches the Parquet type in the Parquet file. (If the filter is passed + // to Parquet and used by Parquet to filter row groups, Parquet checks that the type + // in the predicate matches the type in the file as a validation step before filtering.) + // If the two do not match, we abort the conversion. + org.apache.parquet.schema.Type pType = mType.getType(path); + if (!(pType instanceof PrimitiveType)) { + throw new UnsupportedOperationException(errMsg); + } else { + PrimitiveType.PrimitiveTypeName typeName = ((PrimitiveType) pType).getPrimitiveTypeName(); + String expected = predicateType(typeName); + String actual = predicateType(ref.type().typeId()); + if (!actual.equals(expected)) { + throw new UnsupportedOperationException(errMsg); + } + } + } + switch (ref.type().typeId()) { case BOOLEAN: Operators.BooleanColumn col = FilterApi.booleanColumn(path); @@ -156,7 +187,54 @@ public FilterPredicate predicate(BoundPredicate pred) { return pred(op, FilterApi.binaryColumn(path), getParquetPrimitive(lit)); } - throw new UnsupportedOperationException("Cannot convert to Parquet filter: " + pred); + throw new UnsupportedOperationException(errMsg); + } + + private String predicateType(PrimitiveType.PrimitiveTypeName typeName) { + switch (typeName) { + case BOOLEAN: + return "boolean"; + case INT32: + return "int"; + case INT64: + return "long"; + case FLOAT: + return "float"; + case DOUBLE: + return "double"; + case INT96: + case FIXED_LEN_BYTE_ARRAY: + case BINARY: + return "binary"; + default: + return "unsupported"; + } + } + + private String predicateType(Type.TypeID typeId) { + switch (typeId) { + case BOOLEAN: + return "boolean"; + case INTEGER: + case DATE: + return "int"; + case LONG: + case TIME: + case TIMESTAMP: + return "long"; + case FLOAT: + return "float"; + case DOUBLE: + return "double"; + case STRING: + case UUID: + case FIXED: + case BINARY: + case DECIMAL: + return "binary"; + default: + return "unsupported"; + } } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java index c1d8b0ccbbad..885c8a004651 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java @@ -99,7 +99,6 @@ private static class FileIterator implements CloseableIterator { private final ParquetValueReader model; private final long totalValues; private final boolean reuseContainers; - private final long[] rowGroupsStartRowPos; private int nextRowGroup = 0; private long nextRowGroupStart = 0; @@ -112,7 +111,6 @@ private static class FileIterator implements CloseableIterator { this.model = conf.model(); this.totalValues = conf.totalValues(); this.reuseContainers = conf.reuseContainers(); - this.rowGroupsStartRowPos = conf.startRowPositions(); } @Override @@ -139,21 +137,19 @@ public T next() { private void advance() { while (shouldSkip[nextRowGroup]) { nextRowGroup += 1; - reader.skipNextRowGroup(); } PageReadStore pages; try { - pages = reader.readNextRowGroup(); + pages = reader.readFilteredRowGroup(nextRowGroup); } catch (IOException e) { throw new RuntimeIOException(e); } - long rowPosition = rowGroupsStartRowPos[nextRowGroup]; nextRowGroupStart += pages.getRowCount(); nextRowGroup += 1; - model.setPageSource(pages, rowPosition); + model.setPageSource(pages); } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java index b6c2b5b70303..1187c4197b81 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java @@ -28,5 +28,14 @@ public interface ParquetValueReader { List> columns(); + /** + * @deprecated since 1.6.0, will be removed in 1.7.0; use setPageSource(PageReadStore) instead. + */ + @Deprecated void setPageSource(PageReadStore pageStore, long rowPosition); + + default void setPageSource(PageReadStore pageStore) { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement setPageSource(PageReadStore)"); + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index c1f76e7bdb9a..081fcabe3664 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -27,6 +27,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.PrimitiveIterator; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -113,6 +115,9 @@ public List> columns() { @Override public void setPageSource(PageReadStore pageStore, long rowPosition) {} + + @Override + public void setPageSource(PageReadStore pageStore) {} } static class ConstantReader implements ParquetValueReader { @@ -176,16 +181,24 @@ public List> columns() { @Override public void setPageSource(PageReadStore pageStore, long rowPosition) {} + + @Override + public void setPageSource(PageReadStore pageStore) {} } static class PositionReader implements ParquetValueReader { private long rowOffset = -1; private long rowGroupStart; + private PrimitiveIterator.OfLong rowIndexes; @Override public Long read(Long reuse) { - rowOffset = rowOffset + 1; - return rowGroupStart + rowOffset; + if (rowIndexes != null) { + return rowIndexes.nextLong(); + } else { + rowOffset = rowOffset + 1; + return rowGroupStart + rowOffset; + } } @Override @@ -200,8 +213,17 @@ public List> columns() { @Override public void setPageSource(PageReadStore pageStore, long rowPosition) { - this.rowGroupStart = rowPosition; + setPageSource(pageStore); + } + + @Override + public void setPageSource(PageReadStore pageStore) { + this.rowGroupStart = pageStore.getRowIndexOffset().orElse(0L); this.rowOffset = -1; + Optional optionalRowIndexes = pageStore.getRowIndexes(); + if (optionalRowIndexes.isPresent()) { + this.rowIndexes = optionalRowIndexes.get(); + } } } @@ -221,7 +243,12 @@ protected PrimitiveReader(ColumnDescriptor desc) { @Override public void setPageSource(PageReadStore pageStore, long rowPosition) { - column.setPageSource(pageStore.getPageReader(desc)); + setPageSource(pageStore); + } + + @Override + public void setPageSource(PageReadStore pageStore) { + column.setPageSource(pageStore.getPageReader(desc), pageStore.getRowIndexes()); } @Override @@ -405,7 +432,12 @@ private static class OptionReader implements ParquetValueReader { @Override public void setPageSource(PageReadStore pageStore, long rowPosition) { - reader.setPageSource(pageStore, rowPosition); + setPageSource(pageStore); + } + + @Override + public void setPageSource(PageReadStore pageStore) { + reader.setPageSource(pageStore); } @Override @@ -450,7 +482,12 @@ protected RepeatedReader( @Override public void setPageSource(PageReadStore pageStore, long rowPosition) { - reader.setPageSource(pageStore, rowPosition); + setPageSource(pageStore); + } + + @Override + public void setPageSource(PageReadStore pageStore) { + reader.setPageSource(pageStore); } @Override @@ -569,8 +606,13 @@ protected RepeatedKeyValueReader( @Override public void setPageSource(PageReadStore pageStore, long rowPosition) { - keyReader.setPageSource(pageStore, rowPosition); - valueReader.setPageSource(pageStore, rowPosition); + setPageSource(pageStore); + } + + @Override + public void setPageSource(PageReadStore pageStore) { + keyReader.setPageSource(pageStore); + valueReader.setPageSource(pageStore); } @Override @@ -703,6 +745,7 @@ private interface Setter { private final ParquetValueReader[] readers; private final TripleIterator column; private final List> children; + private boolean topLevel = false; @SuppressWarnings("unchecked") protected StructReader(List types, List> readers) { @@ -725,10 +768,19 @@ protected StructReader(List types, List> readers) { this.column = firstNonNullColumn(children); } + public final void topLevel() { + this.topLevel = true; + } + @Override public final void setPageSource(PageReadStore pageStore, long rowPosition) { + setPageSource(pageStore); + } + + @Override + public final void setPageSource(PageReadStore pageStore) { for (ParquetValueReader reader : readers) { - reader.setPageSource(pageStore, rowPosition); + reader.setPageSource(pageStore); } } @@ -739,6 +791,12 @@ public final TripleIterator column() { @Override public final T read(T reuse) { + if (topLevel && column.needsSynchronizing()) { + for (TripleIterator child : children) { + child.synchronize(); + } + } + I intermediate = newStructData(reuse); for (int i = 0; i < readers.length; i += 1) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java index da91e4dfa56a..1ae28474059e 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java @@ -19,13 +19,11 @@ package org.apache.iceberg.parquet; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; @@ -33,9 +31,7 @@ import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.parquet.ParquetReadOptions; -import org.apache.parquet.crypto.FileDecryptionProperties; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -59,7 +55,6 @@ class ReadConf { private final long totalValues; private final boolean reuseContainers; private final Integer batchSize; - private final long[] startRowPositions; // List of column chunk metadata for each row group private final List> columnChunkMetaDataForRowGroups; @@ -95,10 +90,6 @@ class ReadConf { this.rowGroups = reader.getRowGroups(); this.shouldSkip = new boolean[rowGroups.size()]; - this.startRowPositions = new long[rowGroups.size()]; - - // Fetch all row groups starting positions to compute the row offsets of the filtered row groups - Map offsetToStartPos = generateOffsetToStartPos(expectedSchema); ParquetMetricsRowGroupFilter statsFilter = null; ParquetDictionaryRowGroupFilter dictFilter = null; @@ -112,8 +103,6 @@ class ReadConf { long computedTotalValues = 0L; for (int i = 0; i < shouldSkip.length; i += 1) { BlockMetaData rowGroup = rowGroups.get(i); - startRowPositions[i] = - offsetToStartPos == null ? 0 : offsetToStartPos.get(rowGroup.getStartingPos()); boolean shouldRead = filter == null || (statsFilter.shouldRead(typeWithIds, rowGroup) @@ -127,15 +116,20 @@ class ReadConf { } } - this.totalValues = computedTotalValues; if (readerFunc != null) { this.model = (ParquetValueReader) readerFunc.apply(typeWithIds); this.vectorizedModel = null; this.columnChunkMetaDataForRowGroups = null; + if (options.useRecordFilter()) { + this.totalValues = reader.getFilteredRecordCount(); + } else { + this.totalValues = computedTotalValues; + } } else { this.model = null; this.vectorizedModel = (VectorizedReader) batchedReaderFunc.apply(typeWithIds); this.columnChunkMetaDataForRowGroups = getColumnChunkMetadataForRowGroups(); + this.totalValues = computedTotalValues; } this.reuseContainers = reuseContainers; @@ -155,7 +149,6 @@ private ReadConf(ReadConf toCopy) { this.batchSize = toCopy.batchSize; this.vectorizedModel = toCopy.vectorizedModel; this.columnChunkMetaDataForRowGroups = toCopy.columnChunkMetaDataForRowGroups; - this.startRowPositions = toCopy.startRowPositions; } ParquetFileReader reader() { @@ -181,38 +174,6 @@ boolean[] shouldSkip() { return shouldSkip; } - private Map generateOffsetToStartPos(Schema schema) { - if (schema.findField(MetadataColumns.ROW_POSITION.fieldId()) == null) { - return null; - } - - FileDecryptionProperties decryptionProperties = - (options == null) ? null : options.getDecryptionProperties(); - - ParquetReadOptions readOptions = - ParquetReadOptions.builder().withDecryption(decryptionProperties).build(); - - try (ParquetFileReader fileReader = newReader(file, readOptions)) { - Map offsetToStartPos = Maps.newHashMap(); - - long curRowCount = 0; - for (int i = 0; i < fileReader.getRowGroups().size(); i += 1) { - BlockMetaData meta = fileReader.getRowGroups().get(i); - offsetToStartPos.put(meta.getStartingPos(), curRowCount); - curRowCount += meta.getRowCount(); - } - - return offsetToStartPos; - - } catch (IOException e) { - throw new UncheckedIOException("Failed to create/close reader for file: " + file, e); - } - } - - long[] startRowPositions() { - return startRowPositions; - } - long totalValues() { return totalValues; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TripleIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/TripleIterator.java index 5a833d4c4447..a04fc6839485 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/TripleIterator.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/TripleIterator.java @@ -129,4 +129,16 @@ default Binary nextBinary() { * @throws java.util.NoSuchElementException if there are no more elements */ N nextNull(); + + /** + * Returns true when some triples in this iterator might need to be skipped. + * + * @return whether this iterator needs synchronizing + */ + default boolean needsSynchronizing() { + return false; + } + + /** Skips triples to synchronize the row reading. */ + default void synchronize() {} } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java index 773e0f7a85d0..150e74235329 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java @@ -113,7 +113,6 @@ private static class FileIterator implements CloseableIterator { private long nextRowGroupStart = 0; private long valuesRead = 0; private T last = null; - private final long[] rowGroupsStartRowPos; FileIterator(ReadConf conf) { this.reader = conf.reader(); @@ -124,7 +123,6 @@ private static class FileIterator implements CloseableIterator { this.batchSize = conf.batchSize(); this.model.setBatchSize(this.batchSize); this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups(); - this.rowGroupsStartRowPos = conf.startRowPositions(); } @Override @@ -160,13 +158,12 @@ private void advance() { } PageReadStore pages; try { - pages = reader.readNextRowGroup(); + pages = reader.readNextFilteredRowGroup(); } catch (IOException e) { throw new RuntimeIOException(e); } - long rowPosition = rowGroupsStartRowPos[nextRowGroup]; - model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup), rowPosition); + model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup)); nextRowGroupStart += pages.getRowCount(); nextRowGroup += 1; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java index 72b1e39e9634..1b576929d835 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java @@ -43,10 +43,25 @@ public interface VectorizedReader { * @param pages row group information for all the columns * @param metadata map of {@link ColumnPath} -> {@link ColumnChunkMetaData} for the row group * @param rowPosition the row group's row offset in the parquet file + * @deprecated since 1.6.0, will be removed in 1.7.0; use setRowGroupInfo(PageReadStore, + * Map<ColumnPath, ColumnChunkMetaData>) instead. */ + @Deprecated void setRowGroupInfo( PageReadStore pages, Map metadata, long rowPosition); + /** + * Sets the row group information to be used with this reader + * + * @param pages row group information for all the columns + * @param metadata map of {@link ColumnPath} -> {@link ColumnChunkMetaData} for the row group + */ + default void setRowGroupInfo(PageReadStore pages, Map metadata) { + throw new UnsupportedOperationException( + this.getClass().getName() + + " doesn't implement setRowGroupInfo(PageReadStore, Map)"); + } + /** Release any resources allocated. */ void close(); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index af16d9bbc290..b54dd1edcca4 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -132,7 +132,9 @@ private static class ReadBuilder extends TypeWithSchemaVisitor message( Types.StructType expected, MessageType message, List> fieldReaders) { - return struct(expected, message.asGroupType(), fieldReaders); + StructReader struct = (StructReader) struct(expected, message.asGroupType(), fieldReaders); + struct.topLevel(); + return struct; } @Override diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index f07d8c545e35..ebc4c480c02f 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -55,8 +55,14 @@ public ColumnarBatchReader(List> readers) { @Override public void setRowGroupInfo( PageReadStore pageStore, Map metaData, long rowPosition) { - super.setRowGroupInfo(pageStore, metaData, rowPosition); - this.rowStartPosInBatch = rowPosition; + setRowGroupInfo(pageStore, metaData); + } + + @Override + public void setRowGroupInfo( + PageReadStore pageStore, Map metaData) { + super.setRowGroupInfo(pageStore, metaData); + this.rowStartPosInBatch = pageStore.getRowIndexOffset().orElse(0L); } public void setDeleteFilter(DeleteFilter deleteFilter) { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index af16d9bbc290..b54dd1edcca4 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -132,7 +132,9 @@ private static class ReadBuilder extends TypeWithSchemaVisitor message( Types.StructType expected, MessageType message, List> fieldReaders) { - return struct(expected, message.asGroupType(), fieldReaders); + StructReader struct = (StructReader) struct(expected, message.asGroupType(), fieldReaders); + struct.topLevel(); + return struct; } @Override diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index f07d8c545e35..ebc4c480c02f 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -55,8 +55,14 @@ public ColumnarBatchReader(List> readers) { @Override public void setRowGroupInfo( PageReadStore pageStore, Map metaData, long rowPosition) { - super.setRowGroupInfo(pageStore, metaData, rowPosition); - this.rowStartPosInBatch = rowPosition; + setRowGroupInfo(pageStore, metaData); + } + + @Override + public void setRowGroupInfo( + PageReadStore pageStore, Map metaData) { + super.setRowGroupInfo(pageStore, metaData); + this.rowStartPosInBatch = pageStore.getRowIndexOffset().orElse(0L); } public void setDeleteFilter(DeleteFilter deleteFilter) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index af16d9bbc290..b54dd1edcca4 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -132,7 +132,9 @@ private static class ReadBuilder extends TypeWithSchemaVisitor message( Types.StructType expected, MessageType message, List> fieldReaders) { - return struct(expected, message.asGroupType(), fieldReaders); + StructReader struct = (StructReader) struct(expected, message.asGroupType(), fieldReaders); + struct.topLevel(); + return struct; } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index f07d8c545e35..ebc4c480c02f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -55,8 +55,14 @@ public ColumnarBatchReader(List> readers) { @Override public void setRowGroupInfo( PageReadStore pageStore, Map metaData, long rowPosition) { - super.setRowGroupInfo(pageStore, metaData, rowPosition); - this.rowStartPosInBatch = rowPosition; + setRowGroupInfo(pageStore, metaData); + } + + @Override + public void setRowGroupInfo( + PageReadStore pageStore, Map metaData) { + super.setRowGroupInfo(pageStore, metaData); + this.rowStartPosInBatch = pageStore.getRowIndexOffset().orElse(0L); } public void setDeleteFilter(DeleteFilter deleteFilter) { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetPageSkipping.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetPageSkipping.java new file mode 100644 index 000000000000..6aa9803e7e2a --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetPageSkipping.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.data; + +import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetAvroWriter; +import org.apache.iceberg.relocated.com.google.common.base.Function; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestSparkParquetPageSkipping { + + private static final Types.StructType PRIMITIVES = + Types.StructType.of( + required(0, "_long", Types.LongType.get()), + optional(1, "_string", Types.StringType.get()), // var width + required(2, "_bool", Types.BooleanType.get()), + optional(3, "_int", Types.IntegerType.get()), + optional(4, "_float", Types.FloatType.get()), + required(5, "_double", Types.DoubleType.get()), + optional(6, "_date", Types.DateType.get()), + required(7, "_ts", Types.TimestampType.withZone()), + required(8, "_fixed", Types.FixedType.ofLength(7)), + optional(9, "_bytes", Types.BinaryType.get()), // var width + required(10, "_dec_9_0", Types.DecimalType.of(9, 0)), // int + required(11, "_dec_11_2", Types.DecimalType.of(11, 2)), // long + required(12, "_dec_38_10", Types.DecimalType.of(38, 10)) // fixed + ); + + private static final Schema PRIMITIVES_SCHEMA = new Schema(PRIMITIVES.fields()); + + private static final Types.StructType LIST = + Types.StructType.of( + optional(13, "_list", Types.ListType.ofOptional(14, Types.StringType.get()))); + private static final Types.StructType MAP = + Types.StructType.of( + optional( + 15, + "_map", + Types.MapType.ofOptional(16, 17, Types.StringType.get(), Types.StringType.get()))); + private static final Schema COMPLEX_SCHEMA = + new Schema( + Lists.newArrayList(Iterables.concat(PRIMITIVES.fields(), LIST.fields(), MAP.fields()))); + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + private File testFile; + private List allRecords = Lists.newArrayList(); + private List rowGroup0; + + /* Column and offset indexes info of `_long` column in `testFile` copied from text printed by parquet-cli's + column-index command: + + row-group 0: + column index for column _long: + Boudary order: ASCENDING + null count min max + page-0 0 0 56 + page-1 0 57 113 + page-2 0 114 170 + page-3 0 171 227 + page-4 0 228 284 + page-5 0 285 341 + page-6 0 342 398 + page-7 0 399 455 + page-8 0 456 512 + page-9 0 513 569 + page-10 0 570 592 + + offset index for column _long: + offset compressed size first row index + page-0 4 137 0 + page-1 141 138 57 + page-2 279 137 114 + page-3 416 138 171 + page-4 554 137 228 + page-5 691 141 285 + page-6 832 140 342 + page-7 972 141 399 + page-8 1113 141 456 + page-9 1254 140 513 + page-10 1394 92 570 + + + row-group 1: + column index for column _long: + Boudary order: ASCENDING + null count min max + page-0 0 593 649 + page-1 0 650 706 + page-2 0 707 763 + page-3 0 764 820 + page-4 0 821 877 + page-5 0 878 934 + page-6 0 935 991 + page-7 0 992 999 + + offset index for column _long: + offset compressed size first row index + page-0 498681 140 0 + page-1 498821 140 57 + page-2 498961 141 114 + page-3 499102 141 171 + page-4 499243 141 228 + page-5 499384 140 285 + page-6 499524 142 342 + page-7 499666 68 399 + */ + + private long index = -1; + private static final int ABOVE_INT_COL_MAX_VALUE = Integer.MAX_VALUE; + + @Before + public void generateFile() throws IOException { + testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + Function transform = + record -> { + index += 1; + if (record.get("_long") != null) { + record.put("_long", index); + } + + if (Objects.equals(record.get("_int"), ABOVE_INT_COL_MAX_VALUE)) { + record.put("_int", ABOVE_INT_COL_MAX_VALUE - 1); + } + + return record; + }; + + int numRecords = 1000; + allRecords = + RandomData.generateList(COMPLEX_SCHEMA, numRecords, 0).stream() + .map(transform) + .collect(Collectors.toList()); + rowGroup0 = selectRecords(allRecords, Pair.of(0, 593)); + + try (FileAppender writer = + Parquet.write(Files.localOutput(testFile)) + .createWriterFunc(ParquetAvroWriter::buildWriter) + .schema(COMPLEX_SCHEMA) + .set(PARQUET_PAGE_SIZE_BYTES, "500") + .set(PARQUET_ROW_GROUP_SIZE_BYTES, "500000") // 2 row groups + .set(PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT, "1") + .set(PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT, "1") + .set(PARQUET_DICT_SIZE_BYTES, "1") + .named("pages_unaligned_file") + .build()) { + writer.addAll(allRecords); + } + } + + @Parameterized.Parameters(name = "vectorized = {0}") + public static Object[] parameters() { + return new Object[] {false, true}; + } + + private final boolean vectorized; + + public TestSparkParquetPageSkipping(boolean vectorized) { + this.vectorized = vectorized; + } + + @Test + public void testSinglePageMatch() { + Expression filter = + Expressions.and( + Expressions.greaterThanOrEqual("_long", 57), + Expressions.lessThan("_long", 114)); // exactly page-1 -> row ranges: [57, 113] + + List expected = selectRecords(allRecords, Pair.of(57, 114)); + readAndValidate(filter, expected, rowGroup0); + } + + @Test + public void testMultiplePagesMatch() { + Expression filter = + Expressions.or( + // page-1 -> row ranges: [57, 113] + Expressions.and( + Expressions.greaterThanOrEqual("_long", 57), Expressions.lessThan("_long", 114)), + + // page-3, page-4 in row group 0 -> row ranges[171, 284] + Expressions.and( + Expressions.greaterThanOrEqual("_long", 173), Expressions.lessThan("_long", 260))); + + List expected = + selectRecords(allRecords, Pair.of(57, 114), Pair.of(171, 285)); + readAndValidate(filter, expected, rowGroup0); + } + + @Test + public void testMultipleRowGroupsMatch() { + Expression filter = + Expressions.or( + // page-1 -> row ranges: [57, 113] + Expressions.and( + Expressions.greaterThanOrEqual("_long", 57), Expressions.lessThan("_long", 114)), + + // page-3, page-4 in row group 0 -> row ranges[171, 284] + Expressions.and( + Expressions.greaterThanOrEqual("_long", 173), Expressions.lessThan("_long", 260))); + + filter = + Expressions.or( + filter, + // page-10 in row group 0 and page-0, page-1 in row group 1 -> row ranges: [570, 706] + Expressions.and( + Expressions.greaterThanOrEqual("_long", 572), Expressions.lessThan("_long", 663))); + + List expected = + selectRecords(allRecords, Pair.of(57, 114), Pair.of(171, 285), Pair.of(570, 707)); + readAndValidate(filter, expected, allRecords); + } + + @Test + public void testOnlyFilterPagesOnOneRowGroup() { + Expression filter = + Expressions.and( + Expressions.greaterThanOrEqual("_long", 57), + Expressions.lessThan("_long", 114)); // exactly page-1 -> row ranges: [57, 113] + + filter = + Expressions.or( + filter, + // page-9, page-10 in row group 0 -> row ranges: [513, 592] + // and all pages in row group 1 + Expressions.greaterThanOrEqual("_long", 569)); + + // some pages of row group 0 and all pages of row group 1 + List expected = + selectRecords(allRecords, Pair.of(57, 114), Pair.of(513, 593), Pair.of(593, 1000)); + + readAndValidate(filter, expected, allRecords); + } + + @Test + public void testNoRowsMatch() { + Expression filter = + Expressions.and( + Expressions.and( + Expressions.greaterThan("_long", 40), Expressions.lessThan("_long", 46)), + Expressions.equal("_int", ABOVE_INT_COL_MAX_VALUE)); + + readAndValidate(filter, ImmutableList.of(), ImmutableList.of()); + } + + @Test + public void testAllRowsMatch() { + Expression filter = Expressions.greaterThanOrEqual("_long", Long.MIN_VALUE); + readAndValidate(filter, allRecords, allRecords); + } + + private Schema readSchema() { + return vectorized ? PRIMITIVES_SCHEMA : COMPLEX_SCHEMA; + } + + private void readAndValidate( + Expression filter, + List expected, + List vectorizedExpected) { + Schema projected = readSchema(); + + Parquet.ReadBuilder builder = + Parquet.read(Files.localInput(testFile)).project(projected).filter(filter); + + Types.StructType struct = projected.asStruct(); + + if (vectorized) { + CloseableIterable batches = + builder + .createBatchedReaderFunc( + type -> + VectorizedSparkParquetReaders.buildReader( + projected, type, ImmutableMap.of(), null)) + .build(); + + Iterator expectedIterator = vectorizedExpected.iterator(); + for (ColumnarBatch batch : batches) { + TestHelpers.assertEqualsBatch(struct, expectedIterator, batch); + } + + Assert.assertFalse( + "The expected records is more than the actual result", expectedIterator.hasNext()); + } else { + CloseableIterable reader = + builder + .createReaderFunc(type -> SparkParquetReaders.buildReader(projected, type)) + .build(); + CloseableIterator actualRows = reader.iterator(); + + for (GenericData.Record record : expected) { + Assert.assertTrue("Should have expected number of rows", actualRows.hasNext()); + InternalRow row = actualRows.next(); + TestHelpers.assertEqualsUnsafe(struct, record, row); + } + + Assert.assertFalse("Should not have extra rows", actualRows.hasNext()); + } + } + + private List selectRecords( + List records, Pair... ranges) { + return Arrays.stream(ranges) + .map(range -> records.subList(range.first(), range.second())) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } +}