Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,27 @@
*/
package org.apache.parquet.column.page;

import org.apache.parquet.Preconditions;

/**
* one data page in a chunk
*/
abstract public class DataPage extends Page {

private final int valueCount;
private final long firstRowIndex;

DataPage(int compressedSize, int uncompressedSize, int valueCount) {
super(compressedSize, uncompressedSize);
this.valueCount = valueCount;
this.firstRowIndex = -1;
}

DataPage(int compressedSize, int uncompressedSize, int valueCount, long firstRowIndex) {
super(compressedSize, uncompressedSize);
Preconditions.checkArgument(firstRowIndex >= 0, "First row index {} should be non-negative", firstRowIndex);
this.valueCount = valueCount;
this.firstRowIndex = firstRowIndex;
}

/**
Expand All @@ -37,6 +48,20 @@ public int getValueCount() {
return valueCount;
}

/**
* @return the index of the first row in this page
* @throws IllegalStateException
* if no row synchronization is required
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) I would use a different wording in the comment and in the exception as well, for example:

  • row synchronization not supported
  • row synchronization not available
  • row synchronization not possible
  • row synchronization [mode] not enabled
  • row synchronization [mode] not active

Could you also give a few hints about when this happens or what this means?

* @see PageReadStore#isRowSynchronizationRequired()
*/
public long getFirstRowIndex() {
if (firstRowIndex < 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a way to query this state without relying on an exception being thrown?

throw new IllegalStateException(
"No row synchronization is required; all pages shall be read.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a large amount of IllegalStateException-s scattered around the code with very similar texts about row synchronization not being required (but still being a little bit cryptic about what this means). Could you please create a separate Exception class for these so that it's not repeated all over the code with minor differences in wording? This would also allow a more verbose centralized description in the javadoc of the exception.

}
return firstRowIndex;
}

public abstract <T> T accept(Visitor<T> visitor);

public static interface Visitor<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,26 @@ public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, Statis
this.valuesEncoding = valuesEncoding;
}

/**
* @param bytes the bytes for this page
* @param valueCount count of values in this page
* @param uncompressedSize the uncompressed size of the page
* @param firstRowIndex the index of the first row in this page
* @param statistics of the page's values (max, min, num_null)
* @param rlEncoding the repetition level encoding for this page
* @param dlEncoding the definition level encoding for this page
* @param valuesEncoding the values encoding for this page
*/
public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, long firstRowIndex,
Statistics<?> statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) {
super(Ints.checkedCast(bytes.size()), uncompressedSize, valueCount, firstRowIndex);
this.bytes = bytes;
this.statistics = statistics;
this.rlEncoding = rlEncoding;
this.dlEncoding = dlEncoding;
this.valuesEncoding = valuesEncoding;
}

/**
* @return the bytes for the page
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,32 @@ public static DataPageV2 uncompressed(
false);
}

/**
* @param rowCount count of rows
* @param nullCount count of nulls
* @param valueCount count of values
* @param firstRowIndex the index of the first row in this page
* @param repetitionLevels RLE encoded repetition levels
* @param definitionLevels RLE encoded definition levels
* @param dataEncoding encoding for the data
* @param data data encoded with dataEncoding
* @param statistics optional statistics for this page
* @return an uncompressed page
*/
public static DataPageV2 uncompressed(
int rowCount, int nullCount, int valueCount, long firstRowIndex,
BytesInput repetitionLevels, BytesInput definitionLevels,
Encoding dataEncoding, BytesInput data,
Statistics<?> statistics) {
return new DataPageV2(
rowCount, nullCount, valueCount, firstRowIndex,
repetitionLevels, definitionLevels,
dataEncoding, data,
Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()),
statistics,
false);
}

/**
* @param rowCount count of rows
* @param nullCount count of nulls
Expand Down Expand Up @@ -104,6 +130,25 @@ public DataPageV2(
this.isCompressed = isCompressed;
}

private DataPageV2(
int rowCount, int nullCount, int valueCount, long firstRowIndex,
BytesInput repetitionLevels, BytesInput definitionLevels,
Encoding dataEncoding, BytesInput data,
int uncompressedSize,
Statistics<?> statistics,
boolean isCompressed) {
super(Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()), uncompressedSize,
valueCount, firstRowIndex);
this.rowCount = rowCount;
this.nullCount = nullCount;
this.repetitionLevels = repetitionLevels;
this.definitionLevels = definitionLevels;
this.dataEncoding = dataEncoding;
this.data = data;
this.statistics = statistics;
this.isCompressed = isCompressed;
}

public int getRowCount() {
return rowCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.parquet.column.page;

import java.util.PrimitiveIterator;
import org.apache.parquet.column.ColumnDescriptor;

/**
Expand All @@ -40,4 +41,26 @@ public interface PageReadStore {
*/
long getRowCount();

/**
* Returns the indexes of the rows to be read/built. All the rows which index is not returned shall be skipped.
*
* @return the incremental iterator of the row indexes
* @throws IllegalStateException
* if no row synchronization is required
* @see #isRowSynchronizationRequired()
*/
default PrimitiveIterator.OfLong getRowIndexes() {
throw new IllegalStateException("Row synchronization is not required; row indexes are not available");
}

/**
* If row synchronization is required then some values might have to be skipped to get the rows in synch between the
* pages.
*
* @return {@code true} if row synchronization is required; {@code false} otherwise
* @see DataPage#getFirstRowIndex()
*/
default boolean isRowSynchronizationRequired() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Enabled or Active may be a better word than Required.

return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.internal.filter2.columnindex;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat;
import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter;
import org.apache.parquet.filter2.compat.FilterCompat.UnboundRecordFilterCompat;
import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
import org.apache.parquet.filter2.predicate.Operators.And;
import org.apache.parquet.filter2.predicate.Operators.Eq;
import org.apache.parquet.filter2.predicate.Operators.Gt;
import org.apache.parquet.filter2.predicate.Operators.GtEq;
import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
import org.apache.parquet.filter2.predicate.Operators.Lt;
import org.apache.parquet.filter2.predicate.Operators.LtEq;
import org.apache.parquet.filter2.predicate.Operators.Not;
import org.apache.parquet.filter2.predicate.Operators.NotEq;
import org.apache.parquet.filter2.predicate.Operators.Or;
import org.apache.parquet.filter2.predicate.Operators.UserDefined;
import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
import org.apache.parquet.hadoop.metadata.ColumnPath;

/**
* Filter implementation based on column indexes.
* No filtering will be applied for columns where no column index is available.
* No filtering will be applied at all if no offset index is available for any of the columns.
*/
public class ColumnIndexFilter implements Visitor<RowRanges> {
private final ColumnIndexStore columnIndexStore;
private final List<ColumnPath> columns;
private final long rowCount;

public static RowRanges calculateRowRanges(FilterCompat.Filter filter, ColumnIndexStore columnIndexStore,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you describe what this function does?

Collection<ColumnPath> paths, long rowCount) {
return filter.accept(new FilterCompat.Visitor<RowRanges>() {
@Override
public RowRanges visit(FilterPredicateCompat filterPredicateCompat) {
return filterPredicateCompat.getFilterPredicate()
.accept(new ColumnIndexFilter(columnIndexStore, paths, rowCount));
}

@Override
public RowRanges visit(UnboundRecordFilterCompat unboundRecordFilterCompat) {
return RowRanges.single(rowCount);
}

@Override
public RowRanges visit(NoOpFilter noOpFilter) {
return RowRanges.single(rowCount);
}
});
}

private ColumnIndexFilter(ColumnIndexStore columnIndexStore, Collection<ColumnPath> paths, long rowCount) {
// TODO[GS]: Handle the case of no offsetIndex or columnIndex is available: return all rows
this.columnIndexStore = columnIndexStore;
this.columns = new ArrayList<>(paths);
this.rowCount = rowCount;
}

private RowRanges initRanges() {
return RowRanges.single(rowCount);
}

@Override
public <T extends Comparable<T>> RowRanges visit(Eq<T> eq) {
return initRanges();
}

@Override
public <T extends Comparable<T>> RowRanges visit(NotEq<T> notEq) {
return initRanges();
}

@Override
public <T extends Comparable<T>> RowRanges visit(Lt<T> lt) {
return initRanges();
}

@Override
public <T extends Comparable<T>> RowRanges visit(LtEq<T> ltEq) {
return initRanges();
}

@Override
public <T extends Comparable<T>> RowRanges visit(Gt<T> gt) {
return initRanges();
}

@Override
public <T extends Comparable<T>> RowRanges visit(GtEq<T> gtEq) {
return initRanges();
}

@Override
public RowRanges visit(And and) {
return initRanges();
}

@Override
public RowRanges visit(Or or) {
return initRanges();
}

@Override
public RowRanges visit(Not not) {
return initRanges();
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> RowRanges visit(UserDefined<T, U> udp) {
return initRanges();
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> RowRanges visit(
LogicalNotUserDefined<T, U> udp) {
return initRanges();
}

// TODO[GS]: implement the methods by using set operations on RowRanges
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.internal.filter2.columnindex;

import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;

/**
* Provides the {@link ColumnIndex} and {@link OffsetIndex} objects for a row-group.
*/
public interface ColumnIndexStore {
/**
* @param column
* the path of the column
* @return the column index for the column-chunk in the row-group
*/
ColumnIndex getColumnIndex(ColumnPath column);

/**
* @param column
* the path of the column
* @return the offset index for the column-chunk in the row-group
*/
OffsetIndex getOffsetIndex(ColumnPath column);
}
Loading