Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,27 @@
*/
package org.apache.parquet.column.page;

import org.apache.parquet.Preconditions;

/**
* one data page in a chunk
*/
abstract public class DataPage extends Page {

private final int valueCount;
private final long firstRowIndex;

DataPage(int compressedSize, int uncompressedSize, int valueCount) {
super(compressedSize, uncompressedSize);
this.valueCount = valueCount;
this.firstRowIndex = -1;
}

DataPage(int compressedSize, int uncompressedSize, int valueCount, long firstRowIndex) {
super(compressedSize, uncompressedSize);
Preconditions.checkArgument(firstRowIndex >= 0, "First row index {} should be non-negative", firstRowIndex);
this.valueCount = valueCount;
this.firstRowIndex = firstRowIndex;
}

/**
Expand All @@ -37,6 +48,20 @@ public int getValueCount() {
return valueCount;
}

/**
* @return the index of the first row in this page
* @throws IllegalStateException
* if no row synchronization is required

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) I would use a different wording in the comment and in the exception as well, for example:

  • row synchronization not supported
  • row synchronization not available
  • row synchronization not possible
  • row synchronization [mode] not enabled
  • row synchronization [mode] not active

Could you also give a few hints about when this happens or what this means?

* @see PageReadStore#isRowSynchronizationRequired()
*/
public long getFirstRowIndex() {
if (firstRowIndex < 0) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a way to query this state without relying on an exception being thrown?

throw new IllegalStateException(
"No row synchronization is required; all pages shall be read.");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a large amount of IllegalStateException-s scattered around the code with very similar texts about row synchronization not being required (but still being a little bit cryptic about what this means). Could you please create a separate Exception class for these so that it's not repeated all over the code with minor differences in wording? This would also allow a more verbose centralized description in the javadoc of the exception.

}
return firstRowIndex;
}

public abstract <T> T accept(Visitor<T> visitor);

public static interface Visitor<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,26 @@ public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, Statis
this.valuesEncoding = valuesEncoding;
}

/**
* @param bytes the bytes for this page
* @param valueCount count of values in this page
* @param uncompressedSize the uncompressed size of the page
* @param firstRowIndex the index of the first row in this page
* @param statistics of the page's values (max, min, num_null)
* @param rlEncoding the repetition level encoding for this page
* @param dlEncoding the definition level encoding for this page
* @param valuesEncoding the values encoding for this page
*/
public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, long firstRowIndex,
Statistics<?> statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) {
super(Ints.checkedCast(bytes.size()), uncompressedSize, valueCount, firstRowIndex);
this.bytes = bytes;
this.statistics = statistics;
this.rlEncoding = rlEncoding;
this.dlEncoding = dlEncoding;
this.valuesEncoding = valuesEncoding;
}

/**
* @return the bytes for the page
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,32 @@ public static DataPageV2 uncompressed(
false);
}

/**
* @param rowCount count of rows
* @param nullCount count of nulls
* @param valueCount count of values
* @param firstRowIndex the index of the first row in this page
* @param repetitionLevels RLE encoded repetition levels
* @param definitionLevels RLE encoded definition levels
* @param dataEncoding encoding for the data
* @param data data encoded with dataEncoding
* @param statistics optional statistics for this page
* @return an uncompressed page
*/
public static DataPageV2 uncompressed(
int rowCount, int nullCount, int valueCount, long firstRowIndex,
BytesInput repetitionLevels, BytesInput definitionLevels,
Encoding dataEncoding, BytesInput data,
Statistics<?> statistics) {
return new DataPageV2(
rowCount, nullCount, valueCount, firstRowIndex,
repetitionLevels, definitionLevels,
dataEncoding, data,
Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()),
statistics,
false);
}

/**
* @param rowCount count of rows
* @param nullCount count of nulls
Expand Down Expand Up @@ -104,6 +130,25 @@ public DataPageV2(
this.isCompressed = isCompressed;
}

private DataPageV2(
int rowCount, int nullCount, int valueCount, long firstRowIndex,
BytesInput repetitionLevels, BytesInput definitionLevels,
Encoding dataEncoding, BytesInput data,
int uncompressedSize,
Statistics<?> statistics,
boolean isCompressed) {
super(Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()), uncompressedSize,
valueCount, firstRowIndex);
this.rowCount = rowCount;
this.nullCount = nullCount;
this.repetitionLevels = repetitionLevels;
this.definitionLevels = definitionLevels;
this.dataEncoding = dataEncoding;
this.data = data;
this.statistics = statistics;
this.isCompressed = isCompressed;
}

public int getRowCount() {
return rowCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.parquet.column.page;

import java.util.PrimitiveIterator;
import org.apache.parquet.column.ColumnDescriptor;

/**
Expand All @@ -40,4 +41,26 @@ public interface PageReadStore {
*/
long getRowCount();

/**
* Returns the indexes of the rows to be read/built. All the rows which index is not returned shall be skipped.
*
* @return the incremental iterator of the row indexes
* @throws IllegalStateException
* if no row synchronization is required
* @see #isRowSynchronizationRequired()
*/
default PrimitiveIterator.OfLong getRowIndexes() {
throw new IllegalStateException("Row synchronization is not required; row indexes are not available");
}

/**
* If row synchronization is required then some values might have to be skipped to get the rows in synch between the
* pages.
*
* @return {@code true} if row synchronization is required; {@code false} otherwise
* @see DataPage#getFirstRowIndex()
*/
default boolean isRowSynchronizationRequired() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Enabled or Active may be a better word than Required.

return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import java.util.ArrayList;
import java.util.List;

import org.apache.parquet.filter2.predicate.Statistics;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveComparator;
import org.apache.parquet.schema.PrimitiveType;

class BinaryColumnIndexBuilder extends ColumnIndexBuilder {
private static class BinaryColumnIndex extends ColumnIndexBase {
private static class BinaryColumnIndex extends ColumnIndexBase<Binary> {
private Binary[] minValues;
private Binary[] maxValues;

Expand All @@ -54,6 +55,28 @@ String getMinValueAsString(int pageIndex) {
String getMaxValueAsString(int pageIndex) {
return stringifier.stringify(maxValues[pageIndex]);
}

@Override
@SuppressWarnings("unchecked")
<T extends Comparable<T>> Statistics<T> createStats(int arrayIndex) {
return (Statistics<T>) new Statistics<Binary>(minValues[arrayIndex], maxValues[arrayIndex], comparator);
}

@Override
ValueComparator createValueComparator(Object value) {
final Binary v = (Binary) value;
return new ValueComparator() {
@Override
int compareValueToMin(int arrayIndex) {
return comparator.compare(v, minValues[arrayIndex]);
}

@Override
int compareValueToMax(int arrayIndex) {
return comparator.compare(v, maxValues[arrayIndex]);
}
};
}
}

private final List<Binary> minValues = new ArrayList<>();
Expand All @@ -76,8 +99,8 @@ private static ByteBuffer convert(Binary value) {

@Override
void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
minValues.add(min == null ? null : convert(min));
maxValues.add(max == null ? null : convert(max));
minValues.add(convert(min));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this check become unnecessary?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ColumnIndexBuilder.fill has changed. Now, we don't add null values for null pages but skip storing min/max values for them.

maxValues.add(convert(max));
}

@Override
Expand All @@ -87,7 +110,7 @@ void addMinMax(Object min, Object max) {
}

@Override
ColumnIndexBase createColumnIndex(PrimitiveType type) {
ColumnIndexBase<Binary> createColumnIndex(PrimitiveType type) {
BinaryColumnIndex columnIndex = new BinaryColumnIndex(type);
columnIndex.minValues = minValues.toArray(new Binary[minValues.size()]);
columnIndex.maxValues = maxValues.toArray(new Binary[maxValues.size()]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.parquet.internal.column.columnindex;

import java.nio.ByteBuffer;

import org.apache.parquet.filter2.predicate.Statistics;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveComparator;
import org.apache.parquet.schema.PrimitiveType;
Expand All @@ -28,7 +28,7 @@
import it.unimi.dsi.fastutil.booleans.BooleanList;

class BooleanColumnIndexBuilder extends ColumnIndexBuilder {
private static class BooleanColumnIndex extends ColumnIndexBase {
private static class BooleanColumnIndex extends ColumnIndexBase<Boolean> {
private boolean[] minValues;
private boolean[] maxValues;

Expand All @@ -55,6 +55,28 @@ String getMinValueAsString(int pageIndex) {
String getMaxValueAsString(int pageIndex) {
return stringifier.stringify(maxValues[pageIndex]);
}

@Override
@SuppressWarnings("unchecked")
<T extends Comparable<T>> Statistics<T> createStats(int arrayIndex) {
return (Statistics<T>) new Statistics<Boolean>(minValues[arrayIndex], maxValues[arrayIndex], comparator);
}

@Override
ValueComparator createValueComparator(Object value) {
final boolean v = (boolean) value;
return new ValueComparator() {
@Override
int compareValueToMin(int arrayIndex) {
return comparator.compare(v, minValues[arrayIndex]);
}

@Override
int compareValueToMax(int arrayIndex) {
return comparator.compare(v, maxValues[arrayIndex]);
}
};
}
}

private final BooleanList minValues = new BooleanArrayList();
Expand All @@ -70,18 +92,18 @@ private static ByteBuffer convert(boolean value) {

@Override
void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
minValues.add(min == null ? false : convert(min));
maxValues.add(max == null ? false : convert(max));
minValues.add(convert(min));
maxValues.add(convert(max));
}

@Override
void addMinMax(Object min, Object max) {
minValues.add(min == null ? false : (boolean) min);
maxValues.add(max == null ? false : (boolean) max);
minValues.add((boolean) min);
maxValues.add((boolean) max);
}

@Override
ColumnIndexBase createColumnIndex(PrimitiveType type) {
ColumnIndexBase<Boolean> createColumnIndex(PrimitiveType type) {
BooleanColumnIndex columnIndex = new BooleanColumnIndex(type);
columnIndex.minValues = minValues.toBooleanArray();
columnIndex.maxValues = maxValues.toBooleanArray();
Expand Down
Loading