diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java index 822ca8973f54..5e1e35df3fc9 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.arrow.vectorized.parquet; +import java.util.Optional; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.IntVector; import org.apache.iceberg.arrow.vectorized.NullabilityHolder; @@ -55,7 +56,7 @@ public Dictionary setRowGroupInfo(PageReader store, boolean allPagesDictEncoded) // setPageSource can result in a data page read. If that happens, we need // to know in advance whether all the pages in the row group are dictionary encoded or not this.vectorizedPageIterator.setAllPagesDictEncoded(allPagesDictEncoded); - super.setPageSource(store); + super.setPageSource(store, Optional.empty()); return dictionary; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/BaseColumnIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/BaseColumnIterator.java index 647397fad670..199de07160d8 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/BaseColumnIterator.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/BaseColumnIterator.java @@ -18,14 +18,17 @@ */ package org.apache.iceberg.parquet; +import java.util.Optional; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Dictionary; import org.apache.parquet.column.page.DataPage; import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.internal.filter2.columnindex.RowRanges; @SuppressWarnings("checkstyle:VisibilityModifier") public abstract class BaseColumnIterator { protected final ColumnDescriptor desc; + protected final int definitionLevel; // state reset for each row group protected PageReader pageSource = null; @@ -34,15 +37,28 @@ public abstract class BaseColumnIterator { protected long advanceNextPageCount = 0L; protected Dictionary dictionary; + // state for page skipping + protected boolean needsSynchronize; + protected long currentRowIndex; + protected int skipValues; + protected BaseColumnIterator(ColumnDescriptor descriptor) { this.desc = descriptor; + this.definitionLevel = desc.getMaxDefinitionLevel() - 1; } + @Deprecated public void setPageSource(PageReader source) { + setPageSource(source, Optional.empty()); + } + + public void setPageSource(PageReader source, Optional rowRanges) { this.pageSource = source; this.triplesCount = source.getTotalValueCount(); this.triplesRead = 0L; this.advanceNextPageCount = 0L; + this.needsSynchronize = rowRanges.isPresent(); + BasePageIterator pageIterator = pageIterator(); pageIterator.reset(); dictionary = ParquetUtil.readDictionary(desc, pageSource); @@ -60,6 +76,17 @@ protected void advance() { if (page != null) { pageIterator.setPage(page); this.advanceNextPageCount += pageIterator.currentPageCount(); + + if (needsSynchronize) { + long firstRowIndex = + page.getFirstRowIndex() + .orElseThrow( + () -> + new IllegalArgumentException( + "Missing page first row index for synchronizing values")); + this.skipValues = 0; + this.currentRowIndex = firstRowIndex - 1; + } } else { return; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java index 75989e8f649b..b948458fd6a3 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java @@ -71,6 +71,10 @@ protected void reset() { this.hasNext = false; } + protected void skip(int skipValues) { + throw new UnsupportedOperationException(); + } + protected abstract void initDataReader( Encoding dataEncoding, ByteBufferInputStream in, int valueCount); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java index 1c0ea4829eb8..622b8e5921dc 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java @@ -18,7 +18,11 @@ */ package org.apache.iceberg.parquet; +import java.util.Optional; +import java.util.PrimitiveIterator; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.internal.filter2.columnindex.RowRanges; import org.apache.parquet.io.api.Binary; public abstract class ColumnIterator extends BaseColumnIterator implements TripleIterator { @@ -89,6 +93,8 @@ public Binary next() { } private final PageIterator pageIterator; + private PrimitiveIterator.OfLong rowIndexes; + private long targetRowIndex = Long.MIN_VALUE; private ColumnIterator(ColumnDescriptor desc, String writerVersion) { super(desc); @@ -160,4 +166,50 @@ public N nextNull() { protected BasePageIterator pageIterator() { return pageIterator; } + + @Override + public void setPageSource(PageReader source) { + setPageSource(source, Optional.empty()); + } + + @Override + public void setPageSource(PageReader source, Optional rowRanges) { + super.setPageSource(source, rowRanges); + if (rowRanges.isPresent()) { + this.rowIndexes = rowRanges.get().iterator(); + this.targetRowIndex = Long.MIN_VALUE; + } + } + + @Override + public boolean needsSynchronize() { + return needsSynchronize; + } + + @Override + public void synchronize() { + skipValues = 0; + while (hasNext()) { + advance(); + if (pageIterator.currentRepetitionLevel() == 0) { + currentRowIndex += 1; + if (currentRowIndex > targetRowIndex) { + targetRowIndex = rowIndexes.hasNext() ? rowIndexes.nextLong() : Long.MAX_VALUE; + } + } + + if (currentRowIndex < targetRowIndex) { + triplesRead += 1; + if (pageIterator.currentDefinitionLevel() > definitionLevel) { + skipValues += 1; + } + + pageIterator.advance(); + } else { + break; + } + } + + pageIterator.skip(skipValues); + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/PageIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/PageIterator.java index 34383352bf68..a79445d25607 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/PageIterator.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/PageIterator.java @@ -189,7 +189,12 @@ public V nextNull() { return null; } - private void advance() { + @Override + protected void skip(int skipValues) { + values.skip(skipValues); + } + + protected void advance() { if (triplesRead < triplesCount) { this.currentDL = definitionLevels.nextInt(); this.currentRL = repetitionLevels.nextInt(); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index d240c84b9e4d..877a3d7e4923 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -978,6 +978,7 @@ public static class ReadBuilder { private NameMapping nameMapping = null; private ByteBuffer fileEncryptionKey = null; private ByteBuffer fileAADPrefix = null; + private boolean useColumnIndexFilter = false; private ReadBuilder(InputFile file) { this.file = file; @@ -1020,6 +1021,11 @@ public ReadBuilder filter(Expression newFilter) { return this; } + public ReadBuilder useColumnIndexFilter(boolean newUseColumnIndexFilter) { + this.useColumnIndexFilter = newUseColumnIndexFilter; + return this; + } + public ReadBuilder readSupport(ReadSupport newFilterSupport) { this.readSupport = newFilterSupport; return this; @@ -1139,7 +1145,8 @@ public CloseableIterable build() { nameMapping, filter, reuseContainers, - caseSensitive); + caseSensitive, + useColumnIndexFilter); } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetColumnIndexFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetColumnIndexFilter.java new file mode 100644 index 000000000000..d3e3b3a937d4 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetColumnIndexFilter.java @@ -0,0 +1,764 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.PrimitiveIterator; +import java.util.Set; +import java.util.function.Function; +import java.util.function.IntPredicate; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Binder; +import org.apache.iceberg.expressions.BoundReference; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionVisitors; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.BinaryUtil; +import org.apache.iceberg.util.ByteBuffers; +import org.apache.iceberg.util.Pair; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.IndexIterator; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; +import org.apache.parquet.internal.filter2.columnindex.RowRanges; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ParquetColumnIndexFilter { + + private static final Logger LOG = LoggerFactory.getLogger(ParquetColumnIndexFilter.class); + + private final Schema schema; + private final Expression expr; + + public ParquetColumnIndexFilter(Schema schema, Expression unbound, boolean caseSensitive) { + this.schema = schema; + this.expr = Binder.bind(schema.asStruct(), Expressions.rewriteNot(unbound), caseSensitive); + } + + /** + * Calculates the row ranges containing the indexes of the rows might match the expression. + * + * @param fileSchema schema of file + * @param columnIndexStore the store for providing column/offset indexes + * @param rowCount the total number of rows in the row-group + * @return the ranges of the possible matching row indexes; the returned ranges will contain all + * the rows if any of the required offset index is missing + */ + public RowRanges calculateRowRanges( + MessageType fileSchema, ColumnIndexStore columnIndexStore, long rowCount) { + try { + return new ColumnIndexEvalVisitor(fileSchema, columnIndexStore, rowCount).eval(); + } catch (ColumnIndexStore.MissingOffsetIndexException e) { + LOG.info("Cannot get required offset index; Unable to filter on this row group", e); + return RowRanges.createSingle(rowCount); + } + } + + private static final boolean ROWS_MIGHT_MATCH = true; + private static final boolean ROWS_CANNOT_MATCH = false; + private static final RowRanges NO_ROWS = RowRanges.EMPTY; + + private class ColumnIndexEvalVisitor + extends ExpressionVisitors.BoundExpressionVisitor { + + private final Map idToColumn = Maps.newHashMap(); + private final Map idToColumnIndex = Maps.newHashMap(); + private final Map idToOffsetIndex = Maps.newHashMap(); + private final Map parquetTypes = Maps.newHashMap(); + private final Map icebergTypes = Maps.newHashMap(); + + private final RowRanges allRows; + private final ColumnIndexStore columnIndexStore; + private final long rowCount; + + private ColumnIndexEvalVisitor( + MessageType fileSchema, ColumnIndexStore columnIndexStore, long rowCount) { + this.allRows = RowRanges.createSingle(rowCount); + this.columnIndexStore = columnIndexStore; + this.rowCount = rowCount; + + for (ColumnDescriptor desc : fileSchema.getColumns()) { + String[] path = desc.getPath(); + PrimitiveType colType = fileSchema.getType(path).asPrimitiveType(); + if (colType.getId() != null) { + int id = colType.getId().intValue(); + parquetTypes.put(id, colType); + Type type = schema.findType(id); + if (type != null) { + icebergTypes.put(id, type.asPrimitiveType()); + } + + idToColumn.put(id, ColumnPath.get(path)); + } + } + } + + private RowRanges eval() { + return ExpressionVisitors.visit(expr, this); + } + + @Override + public RowRanges alwaysTrue() { + return allRows; + } + + @Override + public RowRanges alwaysFalse() { + return NO_ROWS; + } + + @Override + public RowRanges not(RowRanges result) { + // The resulting row ranges for column index filter calculations is overestimated, + // so evaluation of NOT expressions is not supported + throw new UnsupportedOperationException("Cannot support evaluating NOT"); + } + + @Override + public RowRanges and(RowRanges left, RowRanges right) { + return RowRanges.intersection(left, right); + } + + @Override + public RowRanges or(RowRanges left, RowRanges right) { + return RowRanges.union(left, right); + } + + @Override + public RowRanges isNull(BoundReference ref) { + int id = ref.fieldId(); + + Function func = + columnIndex -> { + if (columnIndex.hasNullCounts()) { + return IndexIterator.filter(columnIndex.pageCount(), columnIndex::containsNull); + } else { + // Searching for nulls so if we don't have null related statistics we have to return + // all pages + return IndexIterator.all(columnIndex.pageCount()); + } + }; + + return applyPredicate(id, func, ROWS_MIGHT_MATCH); + } + + @Override + public RowRanges notNull(BoundReference ref) { + int id = ref.fieldId(); + + // When filtering nested types notNull() is implicit filter passed even though complex + // filters aren't pushed down in Parquet. Leave all nested column type filters to be + // evaluated post scan. + if (schema.findType(id) instanceof Type.NestedType) { + return allRows; + } + + Function func = + columnIndex -> IndexIterator.filter(columnIndex.pageCount(), columnIndex::isNonNullPage); + + return applyPredicate(id, func, ROWS_CANNOT_MATCH); + } + + @Override + public RowRanges isNaN(BoundReference ref) { + int id = ref.fieldId(); + + Function func = + columnIndex -> IndexIterator.filter(columnIndex.pageCount(), columnIndex::isNonNullPage); + + return applyPredicate(id, func, ROWS_CANNOT_MATCH); + } + + @Override + public RowRanges notNaN(BoundReference ref) { + // Parquet column index does not contain statistics about NaN values, so cannot filter out any + // pages. + return allRows; + } + + @Override + public RowRanges lt(BoundReference ref, Literal lit) { + int id = ref.fieldId(); + + Function func = + columnIndex -> { + IntPredicate filter = + pageIndex -> { + if (columnIndex.isNullPage(pageIndex)) { + return ROWS_CANNOT_MATCH; + } + + T lower = columnIndex.min(pageIndex); + if (lit.comparator().compare(lower, lit.value()) >= 0) { + return ROWS_CANNOT_MATCH; + } + + return ROWS_MIGHT_MATCH; + }; + + return IndexIterator.filter(columnIndex.pageCount(), filter); + }; + + return applyPredicate(id, func, ROWS_CANNOT_MATCH); + } + + @Override + public RowRanges ltEq(BoundReference ref, Literal lit) { + int id = ref.fieldId(); + + Function func = + columnIndex -> { + IntPredicate filter = + pageIndex -> { + if (columnIndex.isNullPage(pageIndex)) { + return ROWS_CANNOT_MATCH; + } + + T lower = columnIndex.min(pageIndex); + if (lit.comparator().compare(lower, lit.value()) > 0) { + return ROWS_CANNOT_MATCH; + } + + return ROWS_MIGHT_MATCH; + }; + + return IndexIterator.filter(columnIndex.pageCount(), filter); + }; + + return applyPredicate(id, func, ROWS_CANNOT_MATCH); + } + + @Override + public RowRanges gt(BoundReference ref, Literal lit) { + int id = ref.fieldId(); + + Function func = + columnIndex -> { + IntPredicate filter = + pageIndex -> { + if (columnIndex.isNullPage(pageIndex)) { + return ROWS_CANNOT_MATCH; + } + + T upper = columnIndex.max(pageIndex); + if (lit.comparator().compare(upper, lit.value()) <= 0) { + return ROWS_CANNOT_MATCH; + } + + return ROWS_MIGHT_MATCH; + }; + + return IndexIterator.filter(columnIndex.pageCount(), filter); + }; + + return applyPredicate(id, func, ROWS_CANNOT_MATCH); + } + + @Override + public RowRanges gtEq(BoundReference ref, Literal lit) { + int id = ref.fieldId(); + + Function func = + columnIndex -> { + IntPredicate filter = + pageIndex -> { + if (columnIndex.isNullPage(pageIndex)) { + return ROWS_CANNOT_MATCH; + } + + T upper = columnIndex.max(pageIndex); + if (lit.comparator().compare(upper, lit.value()) < 0) { + return ROWS_CANNOT_MATCH; + } + + return ROWS_MIGHT_MATCH; + }; + + return IndexIterator.filter(columnIndex.pageCount(), filter); + }; + + return applyPredicate(id, func, ROWS_CANNOT_MATCH); + } + + @Override + public RowRanges eq(BoundReference ref, Literal lit) { + int id = ref.fieldId(); + + Function func = + columnIndex -> { + IntPredicate filter = + pageIndex -> { + if (columnIndex.isNullPage(pageIndex)) { + return ROWS_CANNOT_MATCH; + } + + T lower = columnIndex.min(pageIndex); + if (lit.comparator().compare(lower, lit.value()) > 0) { + return ROWS_CANNOT_MATCH; + } + + T upper = columnIndex.max(pageIndex); + if (lit.comparator().compare(upper, lit.value()) < 0) { + return ROWS_CANNOT_MATCH; + } + + return ROWS_MIGHT_MATCH; + }; + + return IndexIterator.filter(columnIndex.pageCount(), filter); + }; + + return applyPredicate(id, func, ROWS_CANNOT_MATCH); + } + + @Override + public RowRanges notEq(BoundReference ref, Literal lit) { + return allRows; + } + + @Override + public RowRanges in(BoundReference ref, Set literalSet) { + int id = ref.fieldId(); + Pair minMax = minMax(ref.comparator(), literalSet); + + Function func = + columnIndex -> { + IntPredicate filter = + pageIndex -> { + if (columnIndex.isNullPage(pageIndex)) { + return ROWS_CANNOT_MATCH; + } + + T lower = columnIndex.min(pageIndex); + if (ref.comparator().compare(lower, minMax.second()) > 0) { + return ROWS_CANNOT_MATCH; + } + + T upper = columnIndex.max(pageIndex); + if (ref.comparator().compare(upper, minMax.first()) < 0) { + return ROWS_CANNOT_MATCH; + } + + return ROWS_MIGHT_MATCH; + }; + + return IndexIterator.filter(columnIndex.pageCount(), filter); + }; + + return applyPredicate(id, func, ROWS_CANNOT_MATCH); + } + + private Pair minMax(Comparator comparator, Set literalSet) { + T min = null; + T max = null; + + for (T item : literalSet) { + if (min == null) { + min = item; + max = item; + } else { + if (comparator.compare(item, min) < 0) { + min = item; + } else if (comparator.compare(item, max) > 0) { + max = item; + } + } + } + + return Pair.of(min, max); + } + + @Override + public RowRanges notIn(BoundReference ref, Set literalSet) { + return allRows; + } + + @Override + public RowRanges startsWith(BoundReference ref, Literal lit) { + int id = ref.fieldId(); + + Function func = + columnIndex -> { + ByteBuffer prefixAsBytes = lit.toByteBuffer(); + Comparator comparator = Comparators.unsignedBytes(); + + IntPredicate filter = + pageIndex -> { + if (columnIndex.isNullPage(pageIndex)) { + return ROWS_CANNOT_MATCH; + } + + ByteBuffer lower = columnIndex.minBuffer(pageIndex); + + // truncate lower bound so that its length in bytes is not greater than the length + // of prefix + int lowerLength = Math.min(prefixAsBytes.remaining(), lower.remaining()); + int lowerCmp = + comparator.compare( + BinaryUtil.truncateBinary(lower, lowerLength), prefixAsBytes); + if (lowerCmp > 0) { + return ROWS_CANNOT_MATCH; + } + + ByteBuffer upper = columnIndex.maxBuffer(pageIndex); + // truncate upper bound so that its length in bytes is not greater than the length + // of prefix + int upperLength = Math.min(prefixAsBytes.remaining(), upper.remaining()); + int upperCmp = + comparator.compare( + BinaryUtil.truncateBinary(upper, upperLength), prefixAsBytes); + if (upperCmp < 0) { + return ROWS_CANNOT_MATCH; + } + + return ROWS_MIGHT_MATCH; + }; + + return IndexIterator.filter(columnIndex.pageCount(), filter); + }; + + return applyPredicate(id, func, ROWS_CANNOT_MATCH); + } + + @Override + public RowRanges notStartsWith(BoundReference ref, Literal lit) { + int id = ref.fieldId(); + + Function func = + columnIndex -> { + IntPredicate filter; + if (columnIndex.hasNullCounts()) { + ByteBuffer prefixAsBytes = lit.toByteBuffer(); + Comparator comparator = Comparators.unsignedBytes(); + + filter = + pageIndex -> { + if (columnIndex.containsNull(pageIndex)) { + return ROWS_MIGHT_MATCH; + } + + ByteBuffer lower = columnIndex.minBuffer(pageIndex); + // if lower is shorter than the prefix, it can't start with the prefix + if (lower.remaining() < prefixAsBytes.remaining()) { + return ROWS_MIGHT_MATCH; + } + + // truncate lower bound so that its length in bytes is not greater than the + // length of prefix + int cmp = + comparator.compare( + BinaryUtil.truncateBinary(lower, prefixAsBytes.remaining()), + prefixAsBytes); + + if (cmp == 0) { + ByteBuffer upper = columnIndex.maxBuffer(pageIndex); + // the lower bound starts with the prefix; check the upper bound + // if upper is shorter than the prefix, it can't start with the prefix + if (upper.remaining() < prefixAsBytes.remaining()) { + return ROWS_MIGHT_MATCH; + } + + // truncate upper bound so that its length in bytes is not greater than the + // length of prefix + cmp = + comparator.compare( + BinaryUtil.truncateBinary(upper, prefixAsBytes.remaining()), + prefixAsBytes); + if (cmp == 0) { + // both bounds match the prefix, so all rows must match the prefix and none + // do not match + return ROWS_CANNOT_MATCH; + } + } + + return ROWS_MIGHT_MATCH; + }; + } else { + // Return all pages if we don't have null counts statistics + filter = pageIndex -> ROWS_MIGHT_MATCH; + } + + return IndexIterator.filter(columnIndex.pageCount(), filter); + }; + + return applyPredicate(id, func, ROWS_MIGHT_MATCH); + } + + private RowRanges applyPredicate( + int columnId, + Function func, + boolean missingColumnMightMatch) { + + if (!idToColumn.containsKey(columnId)) { + return missingColumnMightMatch ? allRows : NO_ROWS; + } + + // If the column index of a column is not available, we cannot filter on this column. + // If the offset index of a column is not available, a MissingOffsetIndexException will + // be thrown out, and we cannot filter on this row group. + OffsetIndex offsetIndex = offsetIndex(columnId); + ParquetColumnIndex columnIndex = columnIndex(columnId); + if (columnIndex == null) { + LOG.info( + "No column index for column {} is available; Unable to filter on this column", + idToColumn.get(columnId)); + return allRows; + } + + return RowRanges.create(rowCount, func.apply(columnIndex), offsetIndex); + } + + // Assumes that the column corresponding to the id exists in the file. + private OffsetIndex offsetIndex(int columnId) { + return idToOffsetIndex.computeIfAbsent( + columnId, k -> columnIndexStore.getOffsetIndex(idToColumn.get(k))); + } + + // Assumes that the column corresponding to the id exists in the file. + private ParquetColumnIndex columnIndex(int columnId) { + ParquetColumnIndex wrapper = idToColumnIndex.get(columnId); + + if (wrapper == null) { + ColumnIndex columnIndex = columnIndexStore.getColumnIndex(idToColumn.get(columnId)); + if (columnIndex != null) { + wrapper = + new ParquetColumnIndex( + columnIndex, parquetTypes.get(columnId), icebergTypes.get(columnId)); + idToColumnIndex.put(columnId, wrapper); + } + } + + return wrapper; + } + } + + /** + * A wrapper for ColumnIndex, which will cache statistics data and convert min max buffers to + * Iceberg type values. + */ + private static class ParquetColumnIndex { + private final ColumnIndex columnIndex; + private final PrimitiveType primitiveType; + private final Type.PrimitiveType icebergType; + + private List nullPages; + private List minBuffers; + private List maxBuffers; + private List nullCounts; // optional field + + private ParquetColumnIndex( + ColumnIndex columnIndex, PrimitiveType primitiveType, Type.PrimitiveType icebergType) { + this.columnIndex = columnIndex; + this.primitiveType = primitiveType; + this.icebergType = icebergType; + } + + private ByteBuffer minBuffer(int pageIndex) { + if (minBuffers == null) { + minBuffers = columnIndex.getMinValues(); + } + + return minBuffers.get(pageIndex); + } + + private ByteBuffer maxBuffer(int pageIndex) { + if (maxBuffers == null) { + maxBuffers = columnIndex.getMaxValues(); + } + + return maxBuffers.get(pageIndex); + } + + private List nullPages() { + if (nullPages == null) { + nullPages = columnIndex.getNullPages(); + } + + return nullPages; + } + + private T min(int pageIndex) { + return fromBytes(minBuffer(pageIndex)); + } + + private T max(int pageIndex) { + return fromBytes(maxBuffer(pageIndex)); + } + + private Boolean isNullPage(int pageIndex) { + return nullPages().get(pageIndex); + } + + private Boolean isNonNullPage(int pageIndex) { + return !nullPages().get(pageIndex); + } + + private boolean hasNullCounts() { + if (nullCounts == null) { + nullCounts = columnIndex.getNullCounts(); + } + + return nullCounts != null; + } + + private boolean containsNull(int pageIndex) { + if (hasNullCounts()) { + return nullCounts.get(pageIndex) > 0; + } + + throw new UnsupportedOperationException("Has no null counts statistics"); + } + + private int pageCount() { + return nullPages().size(); + } + + @SuppressWarnings("unchecked") + private T fromBytes(ByteBuffer bytes) { + LogicalTypeAnnotation logicalTypeAnnotation = primitiveType.getLogicalTypeAnnotation(); + Optional converted = + logicalTypeAnnotation == null + ? Optional.empty() + : logicalTypeAnnotation.accept( + new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor() { + @Override + public Optional visit( + LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) { + return Optional.of(StandardCharsets.UTF_8.decode(bytes)); + } + + @Override + public Optional visit( + LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) { + return Optional.of(StandardCharsets.UTF_8.decode(bytes)); + } + + @Override + public Optional visit( + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) { + switch (primitiveType.getPrimitiveTypeName()) { + case INT32: + return Optional.of( + new BigDecimal( + BigInteger.valueOf(bytes.getInt(0)), decimalType.getScale())); + case INT64: + return Optional.of( + new BigDecimal( + BigInteger.valueOf(bytes.getLong(0)), decimalType.getScale())); + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + return Optional.of( + new BigDecimal( + new BigInteger(ByteBuffers.toByteArray(bytes)), + decimalType.getScale())); + } + return Optional.empty(); + } + + @Override + public Optional visit( + LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { + switch (timeLogicalType.getUnit()) { + case MILLIS: + return Optional.of(((long) bytes.getInt(0)) * 1000L); + case MICROS: + return Optional.of(bytes.getLong(0)); + case NANOS: + return Optional.of(Math.floorDiv(bytes.getLong(0), 1000)); + } + return Optional.empty(); + } + + @Override + public Optional visit( + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { + switch (timestampLogicalType.getUnit()) { + case MILLIS: + return Optional.of(bytes.getLong(0) * 1000); + case MICROS: + return Optional.of(bytes.getLong(0)); + case NANOS: + return Optional.of(Math.floorDiv(bytes.getLong(0), 1000)); + } + return Optional.empty(); + } + + @Override + public Optional visit( + LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { + return Optional.of(StandardCharsets.UTF_8.decode(bytes)); + } + + @Override + public Optional visit( + LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) { + return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit( + uuidLogicalType); + } + }); + + if (converted.isPresent()) { + return (T) converted.get(); + } + + switch (primitiveType.getPrimitiveTypeName()) { + case BOOLEAN: + return (T) (Boolean) (bytes.get() != 0); + case INT32: + Integer intValue = bytes.getInt(0); + if (icebergType.typeId() == Type.TypeID.LONG) { + return (T) (Long) intValue.longValue(); + } + return (T) intValue; + case INT64: + return (T) (Long) bytes.getLong(0); + case FLOAT: + Float floatValue = bytes.getFloat(0); + if (icebergType.typeId() == Type.TypeID.DOUBLE) { + return (T) (Double) floatValue.doubleValue(); + } + return (T) floatValue; + case DOUBLE: + return (T) (Double) bytes.getDouble(0); + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + return (T) bytes; + default: + throw new UnsupportedOperationException("Unsupported Parquet type: " + primitiveType); + } + } + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java index c1d8b0ccbbad..23302169036c 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java @@ -19,6 +19,7 @@ package org.apache.iceberg.parquet; import java.io.IOException; +import java.util.Optional; import java.util.function.Function; import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.RuntimeIOException; @@ -32,6 +33,7 @@ import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.internal.filter2.columnindex.RowRanges; import org.apache.parquet.schema.MessageType; public class ParquetReader extends CloseableGroup implements CloseableIterable { @@ -43,6 +45,7 @@ public class ParquetReader extends CloseableGroup implements CloseableIterabl private final boolean reuseContainers; private final boolean caseSensitive; private final NameMapping nameMapping; + private final boolean useColumnIndexFilter; public ParquetReader( InputFile input, @@ -53,6 +56,28 @@ public ParquetReader( Expression filter, boolean reuseContainers, boolean caseSensitive) { + this( + input, + expectedSchema, + options, + readerFunc, + nameMapping, + filter, + reuseContainers, + caseSensitive, + false); + } + + public ParquetReader( + InputFile input, + Schema expectedSchema, + ParquetReadOptions options, + Function> readerFunc, + NameMapping nameMapping, + Expression filter, + boolean reuseContainers, + boolean caseSensitive, + boolean useColumnIndexFilter) { this.input = input; this.expectedSchema = expectedSchema; this.options = options; @@ -62,6 +87,7 @@ public ParquetReader( this.reuseContainers = reuseContainers; this.caseSensitive = caseSensitive; this.nameMapping = nameMapping; + this.useColumnIndexFilter = useColumnIndexFilter; } private ReadConf conf = null; @@ -79,7 +105,8 @@ private ReadConf init() { nameMapping, reuseContainers, caseSensitive, - null); + null, + useColumnIndexFilter); this.conf = readConf.copy(); return readConf; } @@ -100,6 +127,7 @@ private static class FileIterator implements CloseableIterator { private final long totalValues; private final boolean reuseContainers; private final long[] rowGroupsStartRowPos; + private final RowRanges[] rowRangesArr; private int nextRowGroup = 0; private long nextRowGroupStart = 0; @@ -113,6 +141,7 @@ private static class FileIterator implements CloseableIterator { this.totalValues = conf.totalValues(); this.reuseContainers = conf.reuseContainers(); this.rowGroupsStartRowPos = conf.startRowPositions(); + this.rowRangesArr = conf.rowRangesArr(); } @Override @@ -139,12 +168,16 @@ public T next() { private void advance() { while (shouldSkip[nextRowGroup]) { nextRowGroup += 1; - reader.skipNextRowGroup(); } PageReadStore pages; + Optional rowRanges = Optional.ofNullable(rowRangesArr[nextRowGroup]); try { - pages = reader.readNextRowGroup(); + if (rowRanges.isPresent()) { + pages = reader.readFilteredRowGroup(nextRowGroup, rowRanges.get()); + } else { + pages = reader.readRowGroup(nextRowGroup); + } } catch (IOException e) { throw new RuntimeIOException(e); } @@ -153,7 +186,7 @@ private void advance() { nextRowGroupStart += pages.getRowCount(); nextRowGroup += 1; - model.setPageSource(pages, rowPosition); + model.setPageSource(pages, rowPosition, rowRanges); } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java index b6c2b5b70303..bf85ac9b94dd 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java @@ -19,7 +19,9 @@ package org.apache.iceberg.parquet; import java.util.List; +import java.util.Optional; import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.internal.filter2.columnindex.RowRanges; public interface ParquetValueReader { T read(T reuse); @@ -29,4 +31,10 @@ public interface ParquetValueReader { List> columns(); void setPageSource(PageReadStore pageStore, long rowPosition); + + default void setPageSource( + PageReadStore pageStore, long rowPosition, Optional rowRanges) { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement setPageSource"); + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index c1f76e7bdb9a..c15794288ef4 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -27,11 +27,14 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.PrimitiveIterator; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.internal.filter2.columnindex.RowRanges; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.Type; @@ -113,6 +116,10 @@ public List> columns() { @Override public void setPageSource(PageReadStore pageStore, long rowPosition) {} + + @Override + public void setPageSource( + PageReadStore pageStore, long rowPosition, Optional rowRanges) {} } static class ConstantReader implements ParquetValueReader { @@ -176,16 +183,20 @@ public List> columns() { @Override public void setPageSource(PageReadStore pageStore, long rowPosition) {} + + @Override + public void setPageSource( + PageReadStore pageStore, long rowPosition, Optional rowRanges) {} } static class PositionReader implements ParquetValueReader { private long rowOffset = -1; private long rowGroupStart; + private PrimitiveIterator.OfLong rowIndexes; @Override public Long read(Long reuse) { - rowOffset = rowOffset + 1; - return rowGroupStart + rowOffset; + return rowGroupStart + rowIndexes.nextLong(); } @Override @@ -200,8 +211,31 @@ public List> columns() { @Override public void setPageSource(PageReadStore pageStore, long rowPosition) { + this.setPageSource(pageStore, rowPosition, Optional.empty()); + } + + @Override + public void setPageSource( + PageReadStore pageStore, long rowPosition, Optional rowRanges) { this.rowGroupStart = rowPosition; this.rowOffset = -1; + if (rowRanges.isPresent()) { + this.rowIndexes = rowRanges.get().iterator(); + } else { + this.rowIndexes = + new PrimitiveIterator.OfLong() { + @Override + public long nextLong() { + rowOffset = rowOffset + 1; + return rowOffset; + } + + @Override + public boolean hasNext() { + return false; + } + }; + } } } @@ -221,7 +255,13 @@ protected PrimitiveReader(ColumnDescriptor desc) { @Override public void setPageSource(PageReadStore pageStore, long rowPosition) { - column.setPageSource(pageStore.getPageReader(desc)); + this.setPageSource(pageStore, rowPosition, Optional.empty()); + } + + @Override + public void setPageSource( + PageReadStore pageStore, long rowPosition, Optional rowRanges) { + column.setPageSource(pageStore.getPageReader(desc), rowRanges); } @Override @@ -405,7 +445,13 @@ private static class OptionReader implements ParquetValueReader { @Override public void setPageSource(PageReadStore pageStore, long rowPosition) { - reader.setPageSource(pageStore, rowPosition); + this.setPageSource(pageStore, rowPosition, Optional.empty()); + } + + @Override + public void setPageSource( + PageReadStore pageStore, long rowPosition, Optional rowRanges) { + reader.setPageSource(pageStore, rowPosition, rowRanges); } @Override @@ -450,7 +496,13 @@ protected RepeatedReader( @Override public void setPageSource(PageReadStore pageStore, long rowPosition) { - reader.setPageSource(pageStore, rowPosition); + this.setPageSource(pageStore, rowPosition, Optional.empty()); + } + + @Override + public void setPageSource( + PageReadStore pageStore, long rowPosition, Optional rowRanges) { + reader.setPageSource(pageStore, rowPosition, rowRanges); } @Override @@ -569,8 +621,14 @@ protected RepeatedKeyValueReader( @Override public void setPageSource(PageReadStore pageStore, long rowPosition) { - keyReader.setPageSource(pageStore, rowPosition); - valueReader.setPageSource(pageStore, rowPosition); + this.setPageSource(pageStore, rowPosition, Optional.empty()); + } + + @Override + public void setPageSource( + PageReadStore pageStore, long rowPosition, Optional rowRanges) { + keyReader.setPageSource(pageStore, rowPosition, rowRanges); + valueReader.setPageSource(pageStore, rowPosition, rowRanges); } @Override @@ -704,6 +762,8 @@ private interface Setter { private final TripleIterator column; private final List> children; + private boolean topLevel = false; + @SuppressWarnings("unchecked") protected StructReader(List types, List> readers) { this.readers = @@ -725,10 +785,20 @@ protected StructReader(List types, List> readers) { this.column = firstNonNullColumn(children); } + public final void topLevel() { + this.topLevel = true; + } + @Override public final void setPageSource(PageReadStore pageStore, long rowPosition) { + this.setPageSource(pageStore, rowPosition, Optional.empty()); + } + + @Override + public final void setPageSource( + PageReadStore pageStore, long rowPosition, Optional rowRanges) { for (ParquetValueReader reader : readers) { - reader.setPageSource(pageStore, rowPosition); + reader.setPageSource(pageStore, rowPosition, rowRanges); } } @@ -739,6 +809,12 @@ public final TripleIterator column() { @Override public final T read(T reuse) { + if (topLevel && column.needsSynchronize()) { + for (TripleIterator child : children) { + child.synchronize(); + } + } + I intermediate = newStructData(reuse); for (int i = 0; i < readers.length; i += 1) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java index da91e4dfa56a..ff9d275c8193 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java @@ -40,6 +40,8 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; +import org.apache.parquet.internal.filter2.columnindex.RowRanges; import org.apache.parquet.schema.MessageType; /** @@ -60,11 +62,12 @@ class ReadConf { private final boolean reuseContainers; private final Integer batchSize; private final long[] startRowPositions; + private final RowRanges[] rowRangesArr; // List of column chunk metadata for each row group private final List> columnChunkMetaDataForRowGroups; - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "CyclomaticComplexity"}) ReadConf( InputFile file, ParquetReadOptions options, @@ -75,7 +78,8 @@ class ReadConf { NameMapping nameMapping, boolean reuseContainers, boolean caseSensitive, - Integer bSize) { + Integer bSize, + boolean useColumnIndexFilter) { this.file = file; this.options = options; this.reader = newReader(file, options); @@ -96,6 +100,7 @@ class ReadConf { this.rowGroups = reader.getRowGroups(); this.shouldSkip = new boolean[rowGroups.size()]; this.startRowPositions = new long[rowGroups.size()]; + this.rowRangesArr = new RowRanges[rowGroups.size()]; // Fetch all row groups starting positions to compute the row offsets of the filtered row groups Map offsetToStartPos = generateOffsetToStartPos(expectedSchema); @@ -103,10 +108,14 @@ class ReadConf { ParquetMetricsRowGroupFilter statsFilter = null; ParquetDictionaryRowGroupFilter dictFilter = null; ParquetBloomRowGroupFilter bloomFilter = null; + ParquetColumnIndexFilter columnIndexFilter = null; if (filter != null) { statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive); dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive); bloomFilter = new ParquetBloomRowGroupFilter(expectedSchema, filter, caseSensitive); + if (useColumnIndexFilter) { + columnIndexFilter = new ParquetColumnIndexFilter(expectedSchema, filter, caseSensitive); + } } long computedTotalValues = 0L; @@ -121,9 +130,24 @@ class ReadConf { typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)) && bloomFilter.shouldRead( typeWithIds, rowGroup, reader.getBloomFilterDataReader(rowGroup))); + + if (useColumnIndexFilter && filter != null && shouldRead) { + ColumnIndexStore columnIndexStore = reader.getColumnIndexStore(i); + RowRanges rowRanges = + columnIndexFilter.calculateRowRanges( + typeWithIds, columnIndexStore, rowGroup.getRowCount()); + + if (rowRanges.getRanges().size() == 0) { + shouldRead = false; + } else if (rowRanges.rowCount() != rowGroup.getRowCount()) { + rowRangesArr[i] = rowRanges; + } + } + this.shouldSkip[i] = !shouldRead; if (shouldRead) { - computedTotalValues += rowGroup.getRowCount(); + computedTotalValues += + rowRangesArr[i] == null ? rowGroup.getRowCount() : rowRangesArr[i].rowCount(); } } @@ -156,6 +180,7 @@ private ReadConf(ReadConf toCopy) { this.vectorizedModel = toCopy.vectorizedModel; this.columnChunkMetaDataForRowGroups = toCopy.columnChunkMetaDataForRowGroups; this.startRowPositions = toCopy.startRowPositions; + this.rowRangesArr = toCopy.rowRangesArr; } ParquetFileReader reader() { @@ -181,6 +206,10 @@ boolean[] shouldSkip() { return shouldSkip; } + RowRanges[] rowRangesArr() { + return rowRangesArr; + } + private Map generateOffsetToStartPos(Schema schema) { if (schema.findField(MetadataColumns.ROW_POSITION.fieldId()) == null) { return null; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TripleIterator.java b/parquet/src/main/java/org/apache/iceberg/parquet/TripleIterator.java index 5a833d4c4447..7eb91d476fd1 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/TripleIterator.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/TripleIterator.java @@ -129,4 +129,16 @@ default Binary nextBinary() { * @throws java.util.NoSuchElementException if there are no more elements */ N nextNull(); + + /** + * Returns true when some triples in this iterator might need to be skipped. + * + * @return whether this iterator needs to be synchronized + */ + default boolean needsSynchronize() { + return false; + } + + /** Skips triples to synchronize the row reading. */ + default void synchronize() {} } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java index 773e0f7a85d0..e2a61477c682 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java @@ -87,7 +87,8 @@ private ReadConf init() { nameMapping, reuseContainers, caseSensitive, - batchSize); + batchSize, + false); this.conf = readConf.copy(); return readConf; } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestColumnIndexFilter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestColumnIndexFilter.java new file mode 100644 index 000000000000..3b37a3da19dd --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestColumnIndexFilter.java @@ -0,0 +1,943 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.iceberg.expressions.Expressions.and; +import static org.apache.iceberg.expressions.Expressions.equal; +import static org.apache.iceberg.expressions.Expressions.greaterThan; +import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.in; +import static org.apache.iceberg.expressions.Expressions.isNaN; +import static org.apache.iceberg.expressions.Expressions.isNull; +import static org.apache.iceberg.expressions.Expressions.lessThan; +import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.not; +import static org.apache.iceberg.expressions.Expressions.notEqual; +import static org.apache.iceberg.expressions.Expressions.notIn; +import static org.apache.iceberg.expressions.Expressions.notNaN; +import static org.apache.iceberg.expressions.Expressions.notNull; +import static org.apache.iceberg.expressions.Expressions.notStartsWith; +import static org.apache.iceberg.expressions.Expressions.or; +import static org.apache.iceberg.expressions.Expressions.startsWith; +import static org.apache.parquet.internal.column.columnindex.BoundaryOrder.ASCENDING; +import static org.apache.parquet.internal.column.columnindex.BoundaryOrder.DESCENDING; +import static org.apache.parquet.internal.column.columnindex.BoundaryOrder.UNORDERED; +import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Types.optional; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.List; +import java.util.Locale; +import java.util.PrimitiveIterator; +import org.apache.iceberg.Schema; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DecimalUtil; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.internal.column.columnindex.BoundaryOrder; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; +import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; +import org.apache.parquet.internal.filter2.columnindex.RowRanges; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.junit.Assert; +import org.junit.Test; + +public class TestColumnIndexFilter { + /** COPIED FROM org.apache.parquet.internal.filter2.columnindex.TestColumnIndexFilter */ + private static class CIBuilder { + private static final ByteBuffer EMPTY = ByteBuffer.wrap(new byte[0]); + private final PrimitiveType type; + private final BoundaryOrder order; + boolean invalid = false; + private List nullPages = Lists.newArrayList(); + private List nullCounts = Lists.newArrayList(); + private List minValues = Lists.newArrayList(); + private List maxValues = Lists.newArrayList(); + + CIBuilder(PrimitiveType type, BoundaryOrder order) { + this.type = type; + this.order = order; + } + + CIBuilder addNullPage(long nullCount) { + nullPages.add(true); + nullCounts.add(nullCount); + minValues.add(EMPTY); + maxValues.add(EMPTY); + return this; + } + + CIBuilder addPage(long nullCount, byte[] min, byte[] max) { + nullPages.add(false); + nullCounts.add(nullCount); + minValues.add(ByteBuffer.wrap(min)); + maxValues.add(ByteBuffer.wrap(max)); + return this; + } + + CIBuilder addPage(long nullCount, int min, int max) { + nullPages.add(false); + nullCounts.add(nullCount); + minValues.add( + ByteBuffer.allocate(Integer.BYTES).order(ByteOrder.LITTLE_ENDIAN).putInt(0, min)); + maxValues.add( + ByteBuffer.allocate(Integer.BYTES).order(ByteOrder.LITTLE_ENDIAN).putInt(0, max)); + return this; + } + + CIBuilder addPage(long nullCount, long min, long max) { + nullPages.add(false); + nullCounts.add(nullCount); + minValues.add(ByteBuffer.allocate(Long.BYTES).order(ByteOrder.LITTLE_ENDIAN).putLong(0, min)); + maxValues.add(ByteBuffer.allocate(Long.BYTES).order(ByteOrder.LITTLE_ENDIAN).putLong(0, max)); + return this; + } + + CIBuilder addPage(long nullCount, String min, String max) { + nullPages.add(false); + nullCounts.add(nullCount); + minValues.add(ByteBuffer.wrap(min.getBytes(UTF_8))); + maxValues.add(ByteBuffer.wrap(max.getBytes(UTF_8))); + return this; + } + + CIBuilder addPage(long nullCount, double min, double max) { + if (Double.isNaN(min) || Double.isNaN(max)) { + invalid = true; + return this; + } + + nullPages.add(false); + nullCounts.add(nullCount); + minValues.add( + ByteBuffer.allocate(Double.BYTES).order(ByteOrder.LITTLE_ENDIAN).putDouble(0, min)); + maxValues.add( + ByteBuffer.allocate(Double.BYTES).order(ByteOrder.LITTLE_ENDIAN).putDouble(0, max)); + return this; + } + + ColumnIndex build() { + return invalid + ? null + : ColumnIndexBuilder.build(type, order, nullPages, nullCounts, minValues, maxValues); + } + } + + private static class OIBuilder { + private final OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder(); + + OIBuilder addPage(long rowCount) { + builder.add(1234, rowCount); + return this; + } + + OffsetIndex build() { + return builder.build(); + } + } + + private static final long TOTAL_ROW_COUNT = 30; + private static final String INT_COL = "int_col"; + private static final String STR_COL = "str_col"; + private static final String NO_NANS = "no_nans"; + private static final String NO_CI = "no_ci"; + private static final String ALL_NULLS = "all_nulls"; + private static final String INT_DECIMAL_7_2 = "int_decimal_7_2"; + private static final String NOT_IN_FILE = "not_in_file"; + + private static final Schema SCHEMA = + new Schema( + Types.NestedField.optional(1, INT_COL, Types.IntegerType.get()), + Types.NestedField.optional(2, STR_COL, Types.StringType.get()), + Types.NestedField.optional(3, NO_NANS, Types.DoubleType.get()), + Types.NestedField.optional(4, NO_CI, Types.DoubleType.get()), + Types.NestedField.optional(5, ALL_NULLS, Types.LongType.get()), + Types.NestedField.optional(6, INT_DECIMAL_7_2, Types.DecimalType.of(7, 2)), + Types.NestedField.optional(7, NOT_IN_FILE, Types.LongType.get())); + + private static final MessageType FILE_SCHEMA = + org.apache.parquet.schema.Types.buildMessage() + .addField(optional(INT32).id(1).named(INT_COL)) + .addField( + optional(BINARY).id(2).as(LogicalTypeAnnotation.stringType()).id(2).named(STR_COL)) + .addField(optional(DOUBLE).id(3).named(NO_NANS)) + .addField(optional(DOUBLE).id(4).named(NO_CI)) + .addField(optional(INT64).id(5).named(ALL_NULLS)) + .addField( + optional(INT32) + .id(6) + .as(LogicalTypeAnnotation.decimalType(2, 9)) + .named(INT_DECIMAL_7_2)) + .named("table"); + + private static final ColumnIndex INT_COL_CI = + new CIBuilder(optional(INT32).named(INT_COL), ASCENDING) + .addPage(0, 1, 1) + .addPage(1, 2, 6) + .addPage(0, 7, 7) + .addPage(1, 7, 10) + .addPage(0, 11, 17) + .addPage(0, 18, 23) + .addPage(0, 24, 26) + .build(); + private static final OffsetIndex INT_COL_OI = + new OIBuilder() + .addPage(1) + .addPage(6) + .addPage(2) + .addPage(5) + .addPage(7) + .addPage(6) + .addPage(3) + .build(); + private static final ColumnIndex STR_COL_CI = + new CIBuilder(optional(BINARY).as(stringType()).named(STR_COL), DESCENDING) + .addPage(0, "Zulu", "Zulu") + .addPage(0, "Whiskey", "Yankee") + .addPage(1, "Tango", "Victor") + .addNullPage(3) + .addPage(0, "Oscar", "Sierra") + .addPage(0, "Juliett", "November") + .addPage(0, "Bravo", "India") + .addPage(0, "Alfa", "Alfa") + .build(); + private static final OffsetIndex STR_COL_OI = + new OIBuilder() + .addPage(1) + .addPage(3) + .addPage(4) + .addPage(3) + .addPage(5) + .addPage(5) + .addPage(8) + .addPage(1) + .build(); + private static final ColumnIndex NO_NANS_CI = + new CIBuilder(optional(DOUBLE).named(NO_NANS), UNORDERED) + .addPage(0, 2.03, 2.03) + .addPage(0, 0.56, 8.71) + .addPage(2, 3.14, 3.50) + .addPage(0, 2.71, 9.99) + .addPage(3, 0.36, 5.32) + .addPage(0, 4.17, 7.95) + .addNullPage(4) + .build(); + private static final OffsetIndex NO_NANS_OI = + new OIBuilder() + .addPage(1) + .addPage(5) + .addPage(4) + .addPage(6) + .addPage(7) + .addPage(3) + .addPage(4) + .build(); + private static final ColumnIndex NO_CI_CI = null; + private static final OffsetIndex NO_CI_OI = + new OIBuilder() + .addPage(1) + .addPage(3) + .addPage(2) + .addPage(1) + .addPage(5) + .addPage(4) + .addPage(5) + .addPage(7) + .addPage(2) + .build(); + private static final ColumnIndex ALL_NULLS_CI = + new CIBuilder(optional(INT64).named(ALL_NULLS), ASCENDING) + .addNullPage(1) + .addNullPage(29) + .build(); + private static final OffsetIndex ALL_NULLS_OI = new OIBuilder().addPage(1).addPage(29).build(); + private static final ColumnIndex INT_DECIMAL_7_2_CI = + new CIBuilder(optional(INT32).named(INT_DECIMAL_7_2), UNORDERED) + .addPage(0, 99, 99) + .addPage(0, 100, 100) + .addPage(0, 101, 101) + .addPage(0, 98, 98) + .addPage(0, 99, 103) + .addNullPage(4) + .addPage(0, 100, 100) + .addPage(2, 87, 109) + .addNullPage(2) + .build(); + private static final OffsetIndex INT_DECIMAL_7_2_OI = + new OIBuilder() + .addPage(1) + .addPage(3) + .addPage(2) + .addPage(1) + .addPage(5) + .addPage(4) + .addPage(5) + .addPage(7) + .addPage(2) + .build(); + private static final ColumnIndexStore STORE = + new ColumnIndexStore() { + @Override + public ColumnIndex getColumnIndex(ColumnPath column) { + switch (column.toDotString()) { + case INT_COL: + return INT_COL_CI; + case STR_COL: + return STR_COL_CI; + case NO_NANS: + return NO_NANS_CI; + case NO_CI: + return NO_CI_CI; + case ALL_NULLS: + return ALL_NULLS_CI; + case INT_DECIMAL_7_2: + return INT_DECIMAL_7_2_CI; + default: + return null; + } + } + + @Override + public OffsetIndex getOffsetIndex(ColumnPath column) { + switch (column.toDotString()) { + case INT_COL: + return INT_COL_OI; + case STR_COL: + return STR_COL_OI; + case NO_NANS: + return NO_NANS_OI; + case NO_CI: + return NO_CI_OI; + case ALL_NULLS: + return ALL_NULLS_OI; + case INT_DECIMAL_7_2: + return INT_DECIMAL_7_2_OI; + default: + throw new MissingOffsetIndexException(column); + } + } + }; + + /** + * + * + *
+   * row   int_col       str_col        no_nans        no_ci          all_nulls      int_decimal_7_2
+   *                                                 (no column index)
+   *      ------0------  ------0------  ------0------  ------0------  ------0------  ------0------
+   * 0.   1              Zulu           2.03                          null           99
+   *      ------1------  ------1------  ------1------  ------1------  ------1------  ------1------
+   * 1.   2              Yankee         4.67                          null           100
+   * 2.   3              Xray           3.42                          null           100
+   * 3.   4              Whiskey        8.71                          null           100
+   *                     ------2------                 ------2------                 ------2------
+   * 4.   5              Victor         0.56                          null           101
+   * 5.   6              Uniform        4.30                          null           101
+   *                                    ------2------  ------3------                 ------3------
+   * 6.   null           null           null                          null           98
+   *      ------2------                                ------4------                 ------4------
+   * 7.   7              Tango          3.50                          null           102
+   *                     ------3------
+   * 8.   7              null           3.14                          null           103
+   *      ------3------k
+   * 9.   7              null           null                          null           99
+   *                                    ------3------
+   * 10.  null           null           9.99                          null           100
+   *                     ------4------
+   * 11.  8              Sierra         8.78                          null           99
+   *                                                   ------5------                 ------5------
+   * 12.  9              Romeo          9.56                          null           null
+   * 13.  10             Quebec         2.71                          null           null
+   *      ------4------
+   * 14.  11             Papa           5.71                          null           null
+   * 15.  12             Oscar          4.09                          null           null
+   *                     ------5------  ------4------  ------6------                 ------6------
+   * 16.  13             November       null                          null           100
+   * 17.  14             Mike           null                          null           100
+   * 18.  15             Lima           0.36                          null           100
+   * 19.  16             Kilo           2.94                          null           100
+   * 20.  17             Juliett        4.23                          null           100
+   *      ------5------  ------6------                 ------7------                 ------7------
+   * 21.  18             India          null                          null           109
+   * 22.  19             Hotel          5.32                          null           108
+   *                                    ------5------
+   * 23.  20             Golf           4.17                          null           88
+   * 24.  21             Foxtrot        7.92                          null           87
+   * 25.  22             Echo           7.95                          null           88
+   *                                   ------6------
+   * 26.  23             Delta          null                          null           88
+   *      ------6------
+   * 27.  24             Charlie        null                          null           88
+   *                                                   ------8------                 ------8------
+   * 28.  25             Bravo          null                          null           null
+   *                     ------7------
+   * 29.  26             Alfa           null                          null           null
+   * 
+ */ + private static final RowRanges ALL_ROWS = RowRanges.createSingle(TOTAL_ROW_COUNT); + + private static final RowRanges NO_ROWS = RowRanges.EMPTY; + + private static RowRanges selectRowRanges(String path, int... pageIndexes) { + return selectRowRanges(path, STORE, TOTAL_ROW_COUNT, pageIndexes); + } + + private static RowRanges selectRowRanges( + String path, ColumnIndexStore store, long rowCount, int... pageIndexes) { + return RowRanges.create( + rowCount, + new PrimitiveIterator.OfInt() { + int index = -1; + + @Override + public int nextInt() { + return pageIndexes[index]; + } + + @Override + public boolean hasNext() { + index += 1; + return index < pageIndexes.length; + } + }, + store.getOffsetIndex(ColumnPath.fromDotString(path))); + } + + private boolean rowRangesEquals(RowRanges r1, RowRanges r2) { + if (r1 == r2) { + return true; + } + + if (r1 == null || r2 == null) { + return false; + } + + List ranges1 = r1.getRanges(); + List ranges2 = r2.getRanges(); + + if (ranges1.size() != ranges2.size()) { + return false; + } + + for (int i = 0; i < ranges1.size(); i += 1) { + RowRanges.Range range1 = ranges1.get(i); + RowRanges.Range range2 = ranges2.get(i); + if (range1.from != range2.from || range1.to != range2.to) { + return false; + } + } + + return true; + } + + private void assertRowRangesEquals(RowRanges expected, RowRanges actual) { + if (!rowRangesEquals(expected, actual)) { + throw new AssertionError( + String.format("RowRanges are not equal, expected: %s, actual: %s", expected, actual)); + } + } + + private RowRanges calculateRowRanges(Expression expr) { + return calculateRowRanges(expr, true); + } + + private RowRanges calculateRowRanges(Expression expr, boolean caseSensitive) { + return calculateRowRanges(SCHEMA, FILE_SCHEMA, expr, caseSensitive, STORE, TOTAL_ROW_COUNT); + } + + private RowRanges calculateRowRanges( + Schema schema, + MessageType messageType, + Expression expr, + boolean caseSensitive, + ColumnIndexStore store, + long rowCount) { + return new ParquetColumnIndexFilter(schema, expr, caseSensitive) + .calculateRowRanges(messageType, store, rowCount); + } + + @Test + public void testIsNulls() { + RowRanges expected; + + expected = selectRowRanges(INT_COL, 1, 3); + assertRowRangesEquals(expected, calculateRowRanges(isNull(INT_COL))); + + expected = selectRowRanges(STR_COL, 2, 3); + assertRowRangesEquals(expected, calculateRowRanges(isNull(STR_COL))); + + expected = selectRowRanges(NO_NANS, 2, 4, 6); + assertRowRangesEquals(expected, calculateRowRanges(isNull(NO_NANS))); + + expected = ALL_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(isNull(ALL_NULLS))); + } + + @Test + public void testNotNulls() { + RowRanges expected; + + expected = ALL_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(notNull(INT_COL))); + + expected = selectRowRanges(STR_COL, 0, 1, 2, 4, 5, 6, 7); + assertRowRangesEquals(expected, calculateRowRanges(notNull(STR_COL))); + + expected = selectRowRanges(NO_NANS, 0, 1, 2, 3, 4, 5); + assertRowRangesEquals(expected, calculateRowRanges(notNull(NO_NANS))); + + expected = NO_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(notNull(ALL_NULLS))); + } + + @Test + public void testIsNaN() { + RowRanges expected; + + // column index exists, null page 6 should be filtered out + expected = selectRowRanges(NO_NANS, 0, 1, 2, 3, 4, 5); + assertRowRangesEquals(expected, calculateRowRanges(isNaN(NO_NANS))); + + assertRowRangesEquals(ALL_ROWS, calculateRowRanges(isNaN(NO_CI))); + } + + @Test + public void testNotNaN() { + RowRanges expected; + + expected = ALL_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(notNaN(NO_NANS))); + + assertRowRangesEquals(expected, calculateRowRanges(notNaN(NO_CI))); + } + + @Test + public void testMissingColumn() { + Assert.assertThrows( + "Cannot find field 'missing'", + ValidationException.class, + () -> calculateRowRanges(equal("missing", 0))); + } + + @Test + public void testColumnNotInFile() { + RowRanges expected; + + expected = NO_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(notNull(NOT_IN_FILE))); + } + + @Test + public void testMissingColumnIndex() { + RowRanges expected = ALL_ROWS; + + assertRowRangesEquals(expected, calculateRowRanges(isNull(NO_CI))); + assertRowRangesEquals(expected, calculateRowRanges(notNull(NO_CI))); + assertRowRangesEquals(expected, calculateRowRanges(greaterThan(NO_CI, 9))); + assertRowRangesEquals(expected, calculateRowRanges(lessThan(NO_CI, 9))); + assertRowRangesEquals(expected, calculateRowRanges(equal(NO_CI, 9))); + assertRowRangesEquals(expected, calculateRowRanges(notEqual(NO_CI, 9))); + } + + @Test + public void testNot() { + // ColumnIndexEvalVisitor does not support evaluating NOT expression, but NOT should be + // rewritten + RowRanges expected; + + expected = ALL_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(not(lessThan(INT_COL, 1)))); + + expected = selectRowRanges(INT_COL, 1, 2, 3, 4, 5, 6); + assertRowRangesEquals(expected, calculateRowRanges(not(lessThanOrEqual(INT_COL, 1)))); + } + + @Test + public void testAnd() { + RowRanges expected; + Expression expr; + + expected = NO_ROWS; + expr = and(equal(INT_COL, 1), equal(INT_COL, 2)); + assertRowRangesEquals(expected, calculateRowRanges(expr)); + + expr = and(equal(INT_COL, 1), equal(STR_COL, "Alfa")); + assertRowRangesEquals(expected, calculateRowRanges(expr)); + + expr = and(equal(INT_COL, 2), equal(STR_COL, "Tango")); + expected = RowRanges.intersection(selectRowRanges(INT_COL, 1), selectRowRanges(STR_COL, 2)); + assertRowRangesEquals(expected, calculateRowRanges(expr)); + } + + @Test + public void testOr() { + RowRanges expected; + Expression expr; + + expected = selectRowRanges(INT_COL, 0, 1); + expr = or(equal(INT_COL, 1), equal(INT_COL, 2)); + assertRowRangesEquals(expected, calculateRowRanges(expr)); + + expected = RowRanges.union(selectRowRanges(INT_COL, 0), selectRowRanges(STR_COL, 7)); + expr = or(equal(INT_COL, 1), equal(STR_COL, "Alfa")); + assertRowRangesEquals(expected, calculateRowRanges(expr)); + + expr = or(equal(INT_COL, 2), equal(STR_COL, "Tango")); + expected = RowRanges.union(selectRowRanges(INT_COL, 1), selectRowRanges(STR_COL, 2)); + assertRowRangesEquals(expected, calculateRowRanges(expr)); + } + + @Test + public void testIntegerLt() { + RowRanges expected; + + expected = NO_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(lessThan(INT_COL, 1))); + + expected = ALL_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(lessThan(INT_COL, 27))); + + expected = selectRowRanges(INT_COL, 0, 1); + assertRowRangesEquals(expected, calculateRowRanges(lessThan(INT_COL, 7))); + + expected = selectRowRanges(INT_COL, 0, 1, 2, 3); + assertRowRangesEquals(expected, calculateRowRanges(lessThan(INT_COL, 10))); + } + + @Test + public void testIntegerLtEq() { + RowRanges expected; + + expected = NO_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(lessThanOrEqual(INT_COL, 0))); + + expected = ALL_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(lessThanOrEqual(INT_COL, 27))); + + expected = selectRowRanges(INT_COL, 0, 1, 2, 3); + assertRowRangesEquals(expected, calculateRowRanges(lessThanOrEqual(INT_COL, 7))); + + expected = selectRowRanges(INT_COL, 0, 1, 2, 3, 4); + assertRowRangesEquals(expected, calculateRowRanges(lessThanOrEqual(INT_COL, 11))); + + expected = selectRowRanges(INT_COL, 0); + assertRowRangesEquals(expected, calculateRowRanges(lessThanOrEqual(INT_COL, 1))); + } + + @Test + public void testIntegerGt() { + RowRanges expected; + + expected = NO_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(greaterThan(INT_COL, 26))); + + expected = ALL_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(greaterThan(INT_COL, 0))); + + expected = selectRowRanges(INT_COL, 3, 4, 5, 6); + assertRowRangesEquals(expected, calculateRowRanges(greaterThan(INT_COL, 7))); + } + + @Test + public void testIntegerGtEq() { + RowRanges expected; + + expected = NO_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(greaterThanOrEqual(INT_COL, 27))); + + expected = ALL_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(greaterThanOrEqual(INT_COL, 1))); + + expected = selectRowRanges(INT_COL, 2, 3, 4, 5, 6); + assertRowRangesEquals(expected, calculateRowRanges(greaterThanOrEqual(INT_COL, 7))); + } + + @Test + public void testIntegerEq() { + RowRanges expected; + + expected = NO_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(equal(INT_COL, 0))); + + expected = selectRowRanges(INT_COL, 2, 3); + assertRowRangesEquals(expected, calculateRowRanges(equal(INT_COL, 7))); + + expected = selectRowRanges(INT_COL, 0); + assertRowRangesEquals(expected, calculateRowRanges(equal(INT_COL, 1))); + } + + @Test + public void testIntegerNotEq() { + RowRanges expected; + + expected = ALL_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(notEqual(INT_COL, 0))); + + assertRowRangesEquals(expected, calculateRowRanges(notEqual(INT_COL, 7))); + } + + @Test + public void testCaseInsensitive() { + RowRanges expected; + + String intColAllCaps = INT_COL.toUpperCase(Locale.ROOT); + + expected = NO_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(equal(intColAllCaps, 0), false)); + + expected = selectRowRanges(INT_COL, 2, 3); + assertRowRangesEquals(expected, calculateRowRanges(equal(intColAllCaps, 7), false)); + + expected = selectRowRanges(INT_COL, 0); + assertRowRangesEquals(expected, calculateRowRanges(equal(intColAllCaps, 1), false)); + } + + @Test + public void testStringStartsWith() { + RowRanges expected; + + expected = NO_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(startsWith(STR_COL, "?"))); + + assertRowRangesEquals(expected, calculateRowRanges(startsWith(STR_COL, "s"))); + + expected = selectRowRanges(STR_COL, 4); + assertRowRangesEquals(expected, calculateRowRanges(startsWith(STR_COL, "S"))); + + expected = selectRowRanges(STR_COL, 4, 6); + assertRowRangesEquals( + expected, + calculateRowRanges(Expressions.or(startsWith(STR_COL, "Q"), startsWith(STR_COL, "G")))); + + expected = selectRowRanges(STR_COL, 0); + assertRowRangesEquals(expected, calculateRowRanges(startsWith(STR_COL, "Z"))); + } + + @Test + public void testStringNotStartsWith() { + RowRanges expected; + + expected = selectRowRanges(STR_COL, 1, 2, 3, 4, 5, 6, 7); + assertRowRangesEquals(expected, calculateRowRanges(notStartsWith(STR_COL, "Z"))); + + expected = selectRowRanges(STR_COL, 0, 1, 2, 3, 4, 5, 6); + assertRowRangesEquals(expected, calculateRowRanges(notStartsWith(STR_COL, "A"))); + + expected = ALL_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(notStartsWith(STR_COL, "B"))); + } + + @Test + public void testIntegerIn() { + RowRanges expected; + Expression expr; + + expr = in(INT_COL, 7, 13); + expected = selectRowRanges(INT_COL, 2, 3, 4); + assertRowRangesEquals(expected, calculateRowRanges(expr)); + } + + @Test + public void testIntegerNotIn() { + RowRanges expected; + Expression expr; + + expr = notIn(INT_COL, 7, 13); + expected = ALL_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(expr)); + } + + @Test + public void testSomeNullsNotEq() { + RowRanges expected; + Expression expr; + + expr = notEqual(STR_COL, "equal"); + expected = ALL_ROWS; + assertRowRangesEquals(expected, calculateRowRanges(expr)); + } + + @Test + public void testIntTypePromotion() { + RowRanges expected; + Schema promotedLong = new Schema(Types.NestedField.optional(1, INT_COL, Types.LongType.get())); + + expected = NO_ROWS; + RowRanges actual = + calculateRowRanges( + promotedLong, FILE_SCHEMA, equal(INT_COL, 0), true, STORE, TOTAL_ROW_COUNT); + assertRowRangesEquals(expected, actual); + + expected = selectRowRanges(INT_COL, 2, 3); + actual = + calculateRowRanges( + promotedLong, FILE_SCHEMA, equal(INT_COL, 7), true, STORE, TOTAL_ROW_COUNT); + assertRowRangesEquals(expected, actual); + } + + @Test + public void testMissingOffsetIndex() { + RowRanges expected; + + PrimitiveType missingOI = + org.apache.parquet.schema.Types.primitive(INT32, Type.Repetition.REQUIRED) + .id(1) + .named("missing_oi"); + MessageType messageType = new MessageType("table", missingOI); + + expected = ALL_ROWS; + RowRanges actual = + calculateRowRanges(SCHEMA, messageType, equal(INT_COL, 1), true, STORE, TOTAL_ROW_COUNT); + assertRowRangesEquals(expected, actual); + } + + @Test + public void testIntBackedDecimal() { + RowRanges expected; + + Expression expr = equal(INT_DECIMAL_7_2, new BigDecimal("1.00")); + expected = selectRowRanges(INT_DECIMAL_7_2, 1, 4, 6, 7); + + assertRowRangesEquals(expected, calculateRowRanges(expr)); + + expr = + or( + lessThan(INT_DECIMAL_7_2, new BigDecimal("1.00")), + greaterThan(INT_DECIMAL_7_2, new BigDecimal("1.01"))); + + expected = selectRowRanges(INT_DECIMAL_7_2, 0, 3, 4, 7); + assertRowRangesEquals(expected, calculateRowRanges(expr)); + } + + @Test + public void testDecimalTypePromotion() { + RowRanges expected; + + Schema promotedDecimal = + new Schema(Types.NestedField.optional(6, INT_DECIMAL_7_2, Types.DecimalType.of(38, 10))); + + Expression expr = equal(INT_DECIMAL_7_2, new BigDecimal("1.00")); + expected = selectRowRanges(INT_DECIMAL_7_2, 1, 4, 6, 7); + assertRowRangesEquals(expected, calculateRowRanges(expr)); + + expr = + or( + lessThan(INT_DECIMAL_7_2, new BigDecimal("1.00")), + greaterThan(INT_DECIMAL_7_2, new BigDecimal("1.01"))); + + expected = selectRowRanges(INT_DECIMAL_7_2, 0, 3, 4, 7); + assertRowRangesEquals(expected, calculateRowRanges(expr)); + } + + // 38 precision 10 scale decimal to bytes + private byte[] decimalToBytes(String decimalStr) { + BigDecimal decimal = new BigDecimal(decimalStr).setScale(10); + int requiredBytes = TypeUtil.decimalRequiredBytes(38); + byte[] bytes = new byte[requiredBytes]; + return DecimalUtil.toReusedFixLengthBytes(38, 10, decimal, bytes); + } + + @Test + public void testBinaryBackedDecimal() { + String binaryDecimal = "decimal_38_10"; + long rowCount = 9; + + ColumnIndex binaryDecimalCI = + new CIBuilder( + optional(FIXED_LEN_BYTE_ARRAY) + .length(TypeUtil.decimalRequiredBytes(38)) + .named(binaryDecimal), + ASCENDING) + .addPage(0, decimalToBytes("12.34"), decimalToBytes("12.35")) + .addPage( + 0, decimalToBytes("1234567890.987654321"), decimalToBytes("1234567890.987654323")) + .build(); + + OffsetIndex binaryDecimalOI = new OIBuilder().addPage(5).addPage(4).build(); + + ColumnIndexStore columnIndexStore = + new ColumnIndexStore() { + @Override + public ColumnIndex getColumnIndex(ColumnPath columnPath) { + switch (columnPath.toDotString()) { + case "decimal_38_10": + return binaryDecimalCI; + default: + return null; + } + } + + @Override + public OffsetIndex getOffsetIndex(ColumnPath columnPath) { + switch (columnPath.toDotString()) { + case "decimal_38_10": + return binaryDecimalOI; + default: + throw new MissingOffsetIndexException(columnPath); + } + } + }; + + MessageType messageType = + org.apache.parquet.schema.Types.buildMessage() + .addField( + optional(FIXED_LEN_BYTE_ARRAY) + .length(TypeUtil.decimalRequiredBytes(38)) + .id(1) + .as(LogicalTypeAnnotation.decimalType(10, 38)) + .named(binaryDecimal)) + .named("decimal"); + + Schema schema = + new Schema(Types.NestedField.optional(1, binaryDecimal, Types.DecimalType.of(38, 10))); + + Expression expr = + or( + lessThan(binaryDecimal, new BigDecimal("12.34")), + greaterThanOrEqual(binaryDecimal, new BigDecimal("1234567890.987654322"))); + + RowRanges expected = selectRowRanges(binaryDecimal, columnIndexStore, rowCount, 1); + RowRanges actual = + calculateRowRanges(schema, messageType, expr, true, columnIndexStore, rowCount); + assertRowRangesEquals(expected, actual); + + expr = greaterThan(binaryDecimal, new BigDecimal("1234567890.987654323")); + expected = NO_ROWS; + actual = calculateRowRanges(schema, messageType, expr, true, columnIndexStore, rowCount); + assertRowRangesEquals(expected, actual); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index af16d9bbc290..b54dd1edcca4 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -132,7 +132,9 @@ private static class ReadBuilder extends TypeWithSchemaVisitor message( Types.StructType expected, MessageType message, List> fieldReaders) { - return struct(expected, message.asGroupType(), fieldReaders); + StructReader struct = (StructReader) struct(expected, message.asGroupType(), fieldReaders); + struct.topLevel(); + return struct; } @Override diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetPageSkipping.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetPageSkipping.java new file mode 100644 index 000000000000..cd8a7d58c432 --- /dev/null +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetPageSkipping.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.data; + +import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetAvroWriter; +import org.apache.iceberg.relocated.com.google.common.base.Function; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestSparkParquetPageSkipping { + + private static final Types.StructType PRIMITIVES = + Types.StructType.of( + required(0, "_long", Types.LongType.get()), + optional(1, "_string", Types.StringType.get()), // var width + required(2, "_bool", Types.BooleanType.get()), + optional(3, "_int", Types.IntegerType.get()), + optional(4, "_float", Types.FloatType.get()), + required(5, "_double", Types.DoubleType.get()), + optional(6, "_date", Types.DateType.get()), + required(7, "_ts", Types.TimestampType.withZone()), + required(8, "_fixed", Types.FixedType.ofLength(7)), + optional(9, "_bytes", Types.BinaryType.get()), // var width + required(10, "_dec_9_0", Types.DecimalType.of(9, 0)), // int + required(11, "_dec_11_2", Types.DecimalType.of(11, 2)), // long + required(12, "_dec_38_10", Types.DecimalType.of(38, 10)) // fixed + ); + + private static final Schema PRIMITIVES_SCHEMA = new Schema(PRIMITIVES.fields()); + + private static final Types.StructType LIST = + Types.StructType.of( + optional(13, "_list", Types.ListType.ofOptional(14, Types.StringType.get()))); + private static final Types.StructType MAP = + Types.StructType.of( + optional( + 15, + "_map", + Types.MapType.ofOptional(16, 17, Types.StringType.get(), Types.StringType.get()))); + private static final Schema COMPLEX_SCHEMA = + new Schema( + Lists.newArrayList(Iterables.concat(PRIMITIVES.fields(), LIST.fields(), MAP.fields()))); + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + private File testFile; + private List allRecords = Lists.newArrayList(); + + /* Column and offset indexes info of `_long` column in `testFile` copied from text printed by parquet-cli's + column-index command: + + row-group 0: + column index for column _long: + Boudary order: ASCENDING + null count min max + page-0 0 0 56 + page-1 0 57 113 + page-2 0 114 170 + page-3 0 171 227 + page-4 0 228 284 + page-5 0 285 341 + page-6 0 342 398 + page-7 0 399 455 + page-8 0 456 512 + page-9 0 513 569 + page-10 0 570 592 + + offset index for column _long: + offset compressed size first row index + page-0 4 137 0 + page-1 141 138 57 + page-2 279 137 114 + page-3 416 138 171 + page-4 554 137 228 + page-5 691 141 285 + page-6 832 140 342 + page-7 972 141 399 + page-8 1113 141 456 + page-9 1254 140 513 + page-10 1394 92 570 + + + row-group 1: + column index for column _long: + Boudary order: ASCENDING + null count min max + page-0 0 593 649 + page-1 0 650 706 + page-2 0 707 763 + page-3 0 764 820 + page-4 0 821 877 + page-5 0 878 934 + page-6 0 935 991 + page-7 0 992 999 + + offset index for column _long: + offset compressed size first row index + page-0 498681 140 0 + page-1 498821 140 57 + page-2 498961 141 114 + page-3 499102 141 171 + page-4 499243 141 228 + page-5 499384 140 285 + page-6 499524 142 342 + page-7 499666 68 399 + */ + + private long index = -1; + private static final int ABOVE_INT_COL_MAX_VALUE = Integer.MAX_VALUE; + + @Before + public void generateFile() throws IOException { + testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + Function transform = + record -> { + index += 1; + if (record.get("_long") != null) { + record.put("_long", index); + } + + if (Objects.equals(record.get("_int"), ABOVE_INT_COL_MAX_VALUE)) { + record.put("_int", ABOVE_INT_COL_MAX_VALUE - 1); + } + + return record; + }; + + int numRecords = 1000; + allRecords = + RandomData.generateList(COMPLEX_SCHEMA, numRecords, 0).stream() + .map(transform) + .collect(Collectors.toList()); + + try (FileAppender writer = + Parquet.write(Files.localOutput(testFile)) + .createWriterFunc(ParquetAvroWriter::buildWriter) + .schema(COMPLEX_SCHEMA) + .set(PARQUET_PAGE_SIZE_BYTES, "500") + .set(PARQUET_ROW_GROUP_SIZE_BYTES, "500000") // 2 row groups + .set(PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT, "1") + .set(PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT, "1") + .set(PARQUET_DICT_SIZE_BYTES, "1") + .named("pages_unaligned_file") + .build()) { + writer.addAll(allRecords); + } + } + + @Parameterized.Parameters(name = "vectorized = {0}") + public static Object[] parameters() { + return new Object[] {false}; + } + + private final boolean vectorized; + + public TestSparkParquetPageSkipping(boolean vectorized) { + this.vectorized = vectorized; + } + + @Test + public void testSinglePageMatch() { + Expression filter = + Expressions.and( + Expressions.greaterThanOrEqual("_long", 57), + Expressions.lessThan("_long", 114)); // exactly page-1 -> row ranges: [57, 113] + + List expected = selectRecords(allRecords, Pair.of(57, 114)); + readAndValidate(filter, expected); + } + + @Test + public void testMultiplePagesMatch() { + Expression filter = + Expressions.or( + // page-1 -> row ranges: [57, 113] + Expressions.and( + Expressions.greaterThanOrEqual("_long", 57), Expressions.lessThan("_long", 114)), + + // page-3, page-4 in row group 0 -> row ranges[171, 284] + Expressions.and( + Expressions.greaterThanOrEqual("_long", 173), Expressions.lessThan("_long", 260))); + + List expected = + selectRecords(allRecords, Pair.of(57, 114), Pair.of(171, 285)); + readAndValidate(filter, expected); + } + + @Test + public void testMultipleRowGroupsMatch() { + Expression filter = + Expressions.or( + // page-1 -> row ranges: [57, 113] + Expressions.and( + Expressions.greaterThanOrEqual("_long", 57), Expressions.lessThan("_long", 114)), + + // page-3, page-4 in row group 0 -> row ranges[171, 284] + Expressions.and( + Expressions.greaterThanOrEqual("_long", 173), Expressions.lessThan("_long", 260))); + + filter = + Expressions.or( + filter, + // page-10 in row group 0 and page-0, page-1 in row group 1 -> row ranges: [570, 706] + Expressions.and( + Expressions.greaterThanOrEqual("_long", 572), Expressions.lessThan("_long", 663))); + + List expected = + selectRecords(allRecords, Pair.of(57, 114), Pair.of(171, 285), Pair.of(570, 707)); + readAndValidate(filter, expected); + } + + @Test + public void testOnlyFilterPagesOnOneRowGroup() { + Expression filter = + Expressions.and( + Expressions.greaterThanOrEqual("_long", 57), + Expressions.lessThan("_long", 114)); // exactly page-1 -> row ranges: [57, 113] + + filter = + Expressions.or( + filter, + // page-9, page-10 in row group 0 -> row ranges: [513, 592] + // and all pages in row group 1 + Expressions.greaterThanOrEqual("_long", 569)); + + // some pages of row group 0 and all pages of row group 1 + List expected = + selectRecords(allRecords, Pair.of(57, 114), Pair.of(513, 593), Pair.of(593, 1000)); + + readAndValidate(filter, expected); + } + + @Test + public void testNoRowsMatch() { + Expression filter = + Expressions.and( + Expressions.and( + Expressions.greaterThan("_long", 40), Expressions.lessThan("_long", 46)), + Expressions.equal("_int", ABOVE_INT_COL_MAX_VALUE)); + + readAndValidate(filter, ImmutableList.of()); + } + + @Test + public void testAllRowsMatch() { + Expression filter = Expressions.greaterThanOrEqual("_long", Long.MIN_VALUE); + readAndValidate(filter, allRecords); + } + + private Schema readSchema() { + return vectorized ? PRIMITIVES_SCHEMA : COMPLEX_SCHEMA; + } + + private void readAndValidate(Expression filter, List expected) { + Schema projected = readSchema(); + + Parquet.ReadBuilder builder = + Parquet.read(Files.localInput(testFile)) + .project(projected) + .filter(filter) + .useColumnIndexFilter(true); + + Types.StructType struct = projected.asStruct(); + + if (vectorized) { + CloseableIterable batches = + builder + .createBatchedReaderFunc( + type -> + VectorizedSparkParquetReaders.buildReader( + projected, type, ImmutableMap.of(), null)) + .build(); + + Iterator expectedIterator = expected.iterator(); + for (ColumnarBatch batch : batches) { + TestHelpers.assertEqualsBatch(struct, expectedIterator, batch); + } + + Assert.assertFalse( + "The expected records is more than the actual result", expectedIterator.hasNext()); + } else { + CloseableIterable reader = + builder + .createReaderFunc(type -> SparkParquetReaders.buildReader(projected, type)) + .build(); + CloseableIterator actualRows = reader.iterator(); + + for (GenericData.Record record : expected) { + Assert.assertTrue("Should have expected number of rows", actualRows.hasNext()); + TestHelpers.assertEqualsUnsafe(struct, record, actualRows.next()); + } + + Assert.assertFalse("Should not have extra rows", actualRows.hasNext()); + } + } + + private List selectRecords( + List records, Pair... ranges) { + return Arrays.stream(ranges) + .map(range -> records.subList(range.first(), range.second())) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } +}