Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,38 @@

package org.apache.spark.sql.execution.datasources.parquet;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.PrimitiveIterator;

/**
* Helper class to store intermediate state while reading a Parquet column chunk.
*/
final class ParquetReadState {
/** Maximum definition level */
/** A special row range used when there is no row indexes (hence all rows must be included) */
private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, Long.MAX_VALUE);

/**
* A special row range used when the row indexes are present AND all the row ranges have been
* processed. This serves as a sentinel at the end indicating that all rows come after the last
* row range should be skipped.
*/
private static final RowRange END_ROW_RANGE = new RowRange(Long.MAX_VALUE, Long.MIN_VALUE);

/** Iterator over all row ranges, only not-null if column index is present */
private final Iterator<RowRange> rowRanges;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does each column generate one row range?

Copy link
Member Author

@sunchao sunchao Jun 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The list of row ranges is associated with a Parquet row group. For example, let's say you have two columns c1:int and c2:bigint in the row group, and the following pages:

  row index   0        500       1000      1500
              -------------------------------
  c1 (int)    |         |         |         |
              -------------------------------
  c2 (bigint) |    |    |    |    |    |    |
              -------------------------------
              0   250  500  750  1000 1250 1500

Suppose the query is SELECT * FROM tbl WHERE c1 = 750 AND c2 = 1100
This, when applied on c1, will produce row range [500, 1000). When applied on c2, will produce row range [1000, 1250). These two will be unioned into [500, 1250) and that is the row range for the whole row group.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the illustration.


/** The current row range */
private RowRange currentRange;

/** Maximum definition level for the Parquet column */
final int maxDefinitionLevel;

/** The current index over all rows within the column chunk. This is used to check if the
* current row should be skipped by comparing against the row ranges. */
long rowId;

/** The offset in the current batch to put the next value */
int offset;

Expand All @@ -33,31 +58,108 @@ final class ParquetReadState {
/** The remaining number of values to read in the current batch */
int valuesToReadInBatch;

ParquetReadState(int maxDefinitionLevel) {
ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong rowIndexes) {
this.maxDefinitionLevel = maxDefinitionLevel;
this.rowRanges = constructRanges(rowIndexes);
nextRange();
}

/**
* Called at the beginning of reading a new batch.
* Construct a list of row ranges from the given `rowIndexes`. For example, suppose the
* `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 row ranges:
Copy link
Contributor

@cloud-fan cloud-fan Jun 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, does the parquet reader lib give you a big array containing these indexes, or it uses an algorithm to generate the indexes on the fly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And how fast/slow the parquet reader lib can generate the indexes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gives you an iterator so yeah generating them on the fly: https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java#L253. The indexes are generated from Range which is very similar to what we defined here. I'm planning to file a JIRA in parquet-mr to just return the original Ranges so we don't have to do this step in Spark.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* `[0-2], [4-5], [7-9]`.
*/
void resetForBatch(int batchSize) {
private Iterator<RowRange> constructRanges(PrimitiveIterator.OfLong rowIndexes) {
if (rowIndexes == null) {
return null;
}

List<RowRange> rowRanges = new ArrayList<>();
long currentStart = Long.MIN_VALUE;
long previous = Long.MIN_VALUE;

while (rowIndexes.hasNext()) {
long idx = rowIndexes.nextLong();
if (currentStart == Long.MIN_VALUE) {
currentStart = idx;
} else if (previous + 1 != idx) {
RowRange range = new RowRange(currentStart, previous);
rowRanges.add(range);
currentStart = idx;
}
previous = idx;
}

if (previous != Long.MIN_VALUE) {
rowRanges.add(new RowRange(currentStart, previous));
}

return rowRanges.iterator();
}

/**
* Must be called at the beginning of reading a new batch.
*/
void resetForNewBatch(int batchSize) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, new names (resetForNewBatch/resetForNewPage) look clear and better.

this.offset = 0;
this.valuesToReadInBatch = batchSize;
}

/**
* Called at the beginning of reading a new page.
* Must be called at the beginning of reading a new page.
*/
void resetForPage(int totalValuesInPage) {
void resetForNewPage(int totalValuesInPage, long pageFirstRowIndex) {
this.valuesToReadInPage = totalValuesInPage;
this.rowId = pageFirstRowIndex;
}

/**
* Advance the current offset to the new values.
* Returns the start index of the current row range.
*/
void advanceOffset(int newOffset) {
long currentRangeStart() {
return currentRange.start;
}

/**
* Returns the end index of the current row range.
*/
long currentRangeEnd() {
return currentRange.end;
}

/**
* Advance the current offset and rowId to the new values.
*/
void advanceOffsetAndRowId(int newOffset, long newRowId) {
valuesToReadInBatch -= (newOffset - offset);
valuesToReadInPage -= (newOffset - offset);
valuesToReadInPage -= (newRowId - rowId);
Comment on lines 134 to +135
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we assert newOffset - offset == newRowId - rowId?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not necessarily true: rowId tracks all the values that could be either read or skipped, while offset only tracks value that are read into the result column vector.

offset = newOffset;
rowId = newRowId;
}

/**
* Advance to the next range.
*/
void nextRange() {
if (rowRanges == null) {
currentRange = MAX_ROW_RANGE;
} else if (!rowRanges.hasNext()) {
currentRange = END_ROW_RANGE;
} else {
currentRange = rowRanges.next();
}
}

/**
* Helper struct to represent a range of row indexes `[start, end]`.
*/
private static class RowRange {
final long start;
final long end;

RowRange(long start, long end) {
this.start = start;
this.end = end;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,28 @@ public interface ParquetVectorUpdater {
* @param values destination values vector
* @param valuesReader reader to read values from
*/
void updateBatch(
void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader);

/**
* Skip a batch of `total` values from `valuesReader`.
*
* @param total total number of values to skip
* @param valuesReader reader to skip values from
*/
void skipValues(int total, VectorizedValuesReader valuesReader);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is renamed, please update the following PR description accordingly.

introduced a new API ParquetVectorUpdater.skipBatch which skips a batch of values from a Parquet value reader.

And, maybe, we had better update line 40 in this file.

Skip a batch of ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the PR description. Regarding the comment, IMO even though the method name is changed, the comment is still accurate in expressing what the method does.


/**
* Read a single value from `valuesReader` into `values`, at `offset`.
*
* @param offset offset in `values` to put the new value
* @param values destination value vector
* @param valuesReader reader to read values from
*/
void update(int offset, WritableColumnVector values, VectorizedValuesReader valuesReader);
void readValue(int offset, WritableColumnVector values, VectorizedValuesReader valuesReader);

/**
* Process a batch of `total` values starting from `offset` in `values`, whose null slots
Expand Down
Loading