diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IndexIterator.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IndexIterator.java index e1ae750332..29235688a8 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IndexIterator.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IndexIterator.java @@ -29,14 +29,14 @@ /** * Iterator implementation for page indexes. */ -class IndexIterator implements PrimitiveIterator.OfInt { +public class IndexIterator implements PrimitiveIterator.OfInt { public static final PrimitiveIterator.OfInt EMPTY = IntStream.empty().iterator(); private int index; private final int endIndex; private final IntPredicate filter; private final IntUnaryOperator translator; - static PrimitiveIterator.OfInt all(int pageCount) { + public static PrimitiveIterator.OfInt all(int pageCount) { return new IndexIterator(0, pageCount, i -> true, i -> i); } @@ -44,7 +44,7 @@ static PrimitiveIterator.OfInt all(ColumnIndexBase.ValueComparator comparator return new IndexIterator(0, comparator.arrayLength(), i -> true, comparator::translate); } - static PrimitiveIterator.OfInt filter(int pageCount, IntPredicate filter) { + public static PrimitiveIterator.OfInt filter(int pageCount, IntPredicate filter) { return new IndexIterator(0, pageCount, filter, i -> i); } diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java index 743f42de66..aabada67fc 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java @@ -92,7 +92,7 @@ public String toString() { } } - static final RowRanges EMPTY = new RowRanges(Collections.emptyList()); + public static final RowRanges EMPTY = new RowRanges(Collections.emptyList()); private final List ranges; @@ -115,7 +115,7 @@ private RowRanges(List ranges) { * @param rowCount a single row count * @return an immutable RowRanges */ - static RowRanges createSingle(long rowCount) { + public static RowRanges createSingle(long rowCount) { return new RowRanges(new Range(0L, rowCount - 1L)); } @@ -137,7 +137,7 @@ static RowRanges createSingle(long rowCount) { * @param offsetIndex offsetIndex * @return a mutable RowRanges */ - static RowRanges create(long rowCount, PrimitiveIterator.OfInt pageIndexes, OffsetIndex offsetIndex) { + public static RowRanges create(long rowCount, PrimitiveIterator.OfInt pageIndexes, OffsetIndex offsetIndex) { RowRanges ranges = new RowRanges(); while (pageIndexes.hasNext()) { int pageIndex = pageIndexes.nextInt(); @@ -146,18 +146,22 @@ static RowRanges create(long rowCount, PrimitiveIterator.OfInt pageIndexes, Offs return ranges; } - /* + /** * Calculates the union of the two specified RowRanges object. The union of two range is calculated if there are no * elements between them. Otherwise, the two disjunct ranges are stored separately. + *
    * For example:
-   * [113, 241] ∪ [221, 340] = [113, 330]
+   * [113, 241] ∪ [221, 340] = [113, 340]
    * [113, 230] ∪ [231, 340] = [113, 340]
    * while
    * [113, 230] ∪ [232, 340] = [113, 230], [232, 340]
-   *
+   * 
* The result RowRanges object will contain all the row indexes that were contained in one of the specified objects. + * @param left left RowRanges + * @param right right RowRanges + * @return a mutable RowRanges contains all the row indexes that were contained in one of the specified objects */ - static RowRanges union(RowRanges left, RowRanges right) { + public static RowRanges union(RowRanges left, RowRanges right) { RowRanges result = new RowRanges(); Iterator it1 = left.ranges.iterator(); Iterator it2 = right.ranges.iterator(); @@ -186,17 +190,20 @@ static RowRanges union(RowRanges left, RowRanges right) { return result; } - /* + /** * Calculates the intersection of the two specified RowRanges object. Two ranges intersect if they have common * elements otherwise the result is empty. + *
    * For example:
    * [113, 241] ∩ [221, 340] = [221, 241]
    * while
-   * [113, 230] ∩ [231, 340] = 
-   *
-   * The result RowRanges object will contain all the row indexes there were contained in both of the specified objects
+   * [113, 230] ∩ [231, 340] = <EMPTY>
+   * 
+ * @param left left RowRanges + * @param right right RowRanges + * @return a mutable RowRanges contains all the row indexes that were contained in both of the specified objects */ - static RowRanges intersection(RowRanges left, RowRanges right) { + public static RowRanges intersection(RowRanges left, RowRanges right) { RowRanges result = new RowRanges(); int rightIndex = 0; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 8f51175028..7fa71cb618 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -46,6 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -1011,6 +1012,35 @@ public PageReadStore readFilteredRowGroup(int blockIndex) throws IOException { } RowRanges rowRanges = getRowRanges(blockIndex); + return readFilteredRowGroup(blockIndex, rowRanges); + } + + /** + * Reads all the columns requested from the specified row group. It may skip specific pages based on the + * {@code rowRanges} passed in. As the rows are not aligned among the pages of the different columns row + * synchronization might be required. See the documentation of the class SynchronizingColumnReader for details. + * + * @param blockIndex the index of the requested block + * @param rowRanges the row ranges to be read from the requested block + * @return the PageReadStore which can provide PageReaders for each column or null if there are no rows in this block + * @throws IOException if an error occurs while reading + * @throws IllegalArgumentException if the {@code blockIndex} is invalid or the {@code rowRanges} is null + */ + public ColumnChunkPageReadStore readFilteredRowGroup(int blockIndex, RowRanges rowRanges) throws IOException { + if (blockIndex < 0 || blockIndex >= blocks.size()) { + throw new IllegalArgumentException(String.format("Invalid block index %s, the valid block index range are: " + + "[%s, %s]", blockIndex, 0, blocks.size() - 1)); + } + + if (Objects.isNull(rowRanges)) { + throw new IllegalArgumentException("RowRanges must not be null"); + } + + BlockMetaData block = blocks.get(blockIndex); + if (block.getRowCount() == 0L) { + return null; + } + long rowCount = rowRanges.rowCount(); if (rowCount == 0) { // There are no matching rows -> returning null @@ -1130,7 +1160,7 @@ private void readChunkPages(Chunk chunk, BlockMetaData block, ColumnChunkPageRea } } - private ColumnIndexStore getColumnIndexStore(int blockIndex) { + public ColumnIndexStore getColumnIndexStore(int blockIndex) { ColumnIndexStore ciStore = blockIndexStores.get(blockIndex); if (ciStore == null) { ciStore = ColumnIndexStoreImpl.create(this, blocks.get(blockIndex), paths.keySet());