Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,22 @@
/**
* Iterator implementation for page indexes.
*/
class IndexIterator implements PrimitiveIterator.OfInt {
public class IndexIterator implements PrimitiveIterator.OfInt {
public static final PrimitiveIterator.OfInt EMPTY = IntStream.empty().iterator();
private int index;
private final int endIndex;
private final IntPredicate filter;
private final IntUnaryOperator translator;

static PrimitiveIterator.OfInt all(int pageCount) {
public static PrimitiveIterator.OfInt all(int pageCount) {
return new IndexIterator(0, pageCount, i -> true, i -> i);
}

static PrimitiveIterator.OfInt all(ColumnIndexBase<?>.ValueComparator comparator) {
return new IndexIterator(0, comparator.arrayLength(), i -> true, comparator::translate);
}

static PrimitiveIterator.OfInt filter(int pageCount, IntPredicate filter) {
public static PrimitiveIterator.OfInt filter(int pageCount, IntPredicate filter) {
return new IndexIterator(0, pageCount, filter, i -> i);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public String toString() {
}
}

static final RowRanges EMPTY = new RowRanges(Collections.emptyList());
public static final RowRanges EMPTY = new RowRanges(Collections.emptyList());

private final List<Range> ranges;

Expand All @@ -115,7 +115,7 @@ private RowRanges(List<Range> ranges) {
* @param rowCount a single row count
* @return an immutable RowRanges
*/
static RowRanges createSingle(long rowCount) {
public static RowRanges createSingle(long rowCount) {
return new RowRanges(new Range(0L, rowCount - 1L));
}

Expand All @@ -137,7 +137,7 @@ static RowRanges createSingle(long rowCount) {
* @param offsetIndex offsetIndex
* @return a mutable RowRanges
*/
static RowRanges create(long rowCount, PrimitiveIterator.OfInt pageIndexes, OffsetIndex offsetIndex) {
public static RowRanges create(long rowCount, PrimitiveIterator.OfInt pageIndexes, OffsetIndex offsetIndex) {
RowRanges ranges = new RowRanges();
while (pageIndexes.hasNext()) {
int pageIndex = pageIndexes.nextInt();
Expand All @@ -146,18 +146,22 @@ static RowRanges create(long rowCount, PrimitiveIterator.OfInt pageIndexes, Offs
return ranges;
}

/*
/**
* Calculates the union of the two specified RowRanges object. The union of two range is calculated if there are no
* elements between them. Otherwise, the two disjunct ranges are stored separately.
* <pre>
* For example:
* [113, 241] ∪ [221, 340] = [113, 330]
* [113, 241] ∪ [221, 340] = [113, 340]
* [113, 230] ∪ [231, 340] = [113, 340]
* while
* [113, 230] ∪ [232, 340] = [113, 230], [232, 340]
*
* </pre>
* The result RowRanges object will contain all the row indexes that were contained in one of the specified objects.
* @param left left RowRanges
* @param right right RowRanges
* @return a mutable RowRanges contains all the row indexes that were contained in one of the specified objects
*/
static RowRanges union(RowRanges left, RowRanges right) {
public static RowRanges union(RowRanges left, RowRanges right) {
RowRanges result = new RowRanges();
Iterator<Range> it1 = left.ranges.iterator();
Iterator<Range> it2 = right.ranges.iterator();
Expand Down Expand Up @@ -186,17 +190,20 @@ static RowRanges union(RowRanges left, RowRanges right) {
return result;
}

/*
/**
* Calculates the intersection of the two specified RowRanges object. Two ranges intersect if they have common
* elements otherwise the result is empty.
* <pre>
* For example:
* [113, 241] ∩ [221, 340] = [221, 241]
* while
* [113, 230] ∩ [231, 340] = <EMPTY>
*
* The result RowRanges object will contain all the row indexes there were contained in both of the specified objects
* [113, 230] ∩ [231, 340] = &lt;EMPTY&gt;
* </pre>
* @param left left RowRanges
* @param right right RowRanges
* @return a mutable RowRanges contains all the row indexes that were contained in both of the specified objects
*/
static RowRanges intersection(RowRanges left, RowRanges right) {
public static RowRanges intersection(RowRanges left, RowRanges right) {
RowRanges result = new RowRanges();

int rightIndex = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -1011,6 +1012,35 @@ public PageReadStore readFilteredRowGroup(int blockIndex) throws IOException {
}

RowRanges rowRanges = getRowRanges(blockIndex);
return readFilteredRowGroup(blockIndex, rowRanges);
}

/**
* Reads all the columns requested from the specified row group. It may skip specific pages based on the
* {@code rowRanges} passed in. As the rows are not aligned among the pages of the different columns row
* synchronization might be required. See the documentation of the class SynchronizingColumnReader for details.
*
* @param blockIndex the index of the requested block
* @param rowRanges the row ranges to be read from the requested block
* @return the PageReadStore which can provide PageReaders for each column or null if there are no rows in this block
* @throws IOException if an error occurs while reading
* @throws IllegalArgumentException if the {@code blockIndex} is invalid or the {@code rowRanges} is null
*/
public ColumnChunkPageReadStore readFilteredRowGroup(int blockIndex, RowRanges rowRanges) throws IOException {
if (blockIndex < 0 || blockIndex >= blocks.size()) {
throw new IllegalArgumentException(String.format("Invalid block index %s, the valid block index range are: " +
"[%s, %s]", blockIndex, 0, blocks.size() - 1));
}

if (Objects.isNull(rowRanges)) {
throw new IllegalArgumentException("RowRanges must not be null");
}

BlockMetaData block = blocks.get(blockIndex);
if (block.getRowCount() == 0L) {
throw new ParquetEmptyBlockException("Illegal row group of 0 rows");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the reader simply skips empty row groups instead of throw. Could you change this to be consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked PARQUET-2291, seems we only skip empty row group when using reader as a iterator, right? We skip empty row group in readNextRowGroup() , but not when the user passes in a blockIndex, and this newly introduced method also requires the user pass in a blockIndex.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why this would throw an exception. This method is intended to allow building an external iterator. I don't think anyone would ever want to fail if there were an empty row group, even if the reader thinks it shouldn't have been written. I think this should return null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, readRowGroup(int blockIndex) and readFilteredRowGroup(int blockIndex) also throw an exception when handling empty row groups, should we make them also return null instead of throwing an exception to be consistent?

}

long rowCount = rowRanges.rowCount();
if (rowCount == 0) {
// There are no matching rows -> returning null
Expand Down Expand Up @@ -1130,7 +1160,7 @@ private void readChunkPages(Chunk chunk, BlockMetaData block, ColumnChunkPageRea
}
}

private ColumnIndexStore getColumnIndexStore(int blockIndex) {
public ColumnIndexStore getColumnIndexStore(int blockIndex) {
ColumnIndexStore ciStore = blockIndexStores.get(blockIndex);
if (ciStore == null) {
ciStore = ColumnIndexStoreImpl.create(this, blocks.get(blockIndex), paths.keySet());
Expand Down