Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.arrow.vectorized.parquet;

import java.util.Optional;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
Expand Down Expand Up @@ -55,7 +56,7 @@ public Dictionary setRowGroupInfo(PageReader store, boolean allPagesDictEncoded)
// setPageSource can result in a data page read. If that happens, we need
// to know in advance whether all the pages in the row group are dictionary encoded or not
this.vectorizedPageIterator.setAllPagesDictEncoded(allPagesDictEncoded);
super.setPageSource(store);
super.setPageSource(store, Optional.empty());
return dictionary;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
*/
package org.apache.iceberg.parquet;

import java.util.Optional;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.page.DataPage;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.internal.filter2.columnindex.RowRanges;

@SuppressWarnings("checkstyle:VisibilityModifier")
public abstract class BaseColumnIterator {
protected final ColumnDescriptor desc;
protected final int definitionLevel;

// state reset for each row group
protected PageReader pageSource = null;
Expand All @@ -34,15 +37,28 @@ public abstract class BaseColumnIterator {
protected long advanceNextPageCount = 0L;
protected Dictionary dictionary;

// state for page skipping
protected boolean needsSynchronize;
protected long currentRowIndex;
protected int skipValues;

protected BaseColumnIterator(ColumnDescriptor descriptor) {
this.desc = descriptor;
this.definitionLevel = desc.getMaxDefinitionLevel() - 1;
}

@Deprecated
public void setPageSource(PageReader source) {
setPageSource(source, Optional.empty());
}

public void setPageSource(PageReader source, Optional<RowRanges> rowRanges) {
this.pageSource = source;
this.triplesCount = source.getTotalValueCount();
this.triplesRead = 0L;
this.advanceNextPageCount = 0L;
this.needsSynchronize = rowRanges.isPresent();

BasePageIterator pageIterator = pageIterator();
pageIterator.reset();
dictionary = ParquetUtil.readDictionary(desc, pageSource);
Expand All @@ -60,6 +76,17 @@ protected void advance() {
if (page != null) {
pageIterator.setPage(page);
this.advanceNextPageCount += pageIterator.currentPageCount();

if (needsSynchronize) {
long firstRowIndex =
page.getFirstRowIndex()
.orElseThrow(
() ->
new IllegalArgumentException(
"Missing page first row index for synchronizing values"));
this.skipValues = 0;
this.currentRowIndex = firstRowIndex - 1;
}
} else {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ protected void reset() {
this.hasNext = false;
}

protected void skip(int skipValues) {
throw new UnsupportedOperationException();
}

protected abstract void initDataReader(
Encoding dataEncoding, ByteBufferInputStream in, int valueCount);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
*/
package org.apache.iceberg.parquet;

import java.util.Optional;
import java.util.PrimitiveIterator;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.internal.filter2.columnindex.RowRanges;
import org.apache.parquet.io.api.Binary;

public abstract class ColumnIterator<T> extends BaseColumnIterator implements TripleIterator<T> {
Expand Down Expand Up @@ -89,6 +93,8 @@ public Binary next() {
}

private final PageIterator<T> pageIterator;
private PrimitiveIterator.OfLong rowIndexes;
private long targetRowIndex = Long.MIN_VALUE;

private ColumnIterator(ColumnDescriptor desc, String writerVersion) {
super(desc);
Expand Down Expand Up @@ -160,4 +166,50 @@ public <N> N nextNull() {
protected BasePageIterator pageIterator() {
return pageIterator;
}

@Override
public void setPageSource(PageReader source) {
setPageSource(source, Optional.empty());
}

@Override
public void setPageSource(PageReader source, Optional<RowRanges> rowRanges) {
super.setPageSource(source, rowRanges);
if (rowRanges.isPresent()) {
this.rowIndexes = rowRanges.get().iterator();
this.targetRowIndex = Long.MIN_VALUE;
}
}

@Override
public boolean needsSynchronize() {
return needsSynchronize;
}

@Override
public void synchronize() {
skipValues = 0;
while (hasNext()) {
advance();
if (pageIterator.currentRepetitionLevel() == 0) {
currentRowIndex += 1;
if (currentRowIndex > targetRowIndex) {
targetRowIndex = rowIndexes.hasNext() ? rowIndexes.nextLong() : Long.MAX_VALUE;
}
}

if (currentRowIndex < targetRowIndex) {
triplesRead += 1;
if (pageIterator.currentDefinitionLevel() > definitionLevel) {
skipValues += 1;
}

pageIterator.advance();
} else {
break;
}
}

pageIterator.skip(skipValues);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,12 @@ public <V> V nextNull() {
return null;
}

private void advance() {
@Override
protected void skip(int skipValues) {
values.skip(skipValues);
}

protected void advance() {
if (triplesRead < triplesCount) {
this.currentDL = definitionLevels.nextInt();
this.currentRL = repetitionLevels.nextInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,7 @@ public static class ReadBuilder {
private NameMapping nameMapping = null;
private ByteBuffer fileEncryptionKey = null;
private ByteBuffer fileAADPrefix = null;
private boolean useColumnIndexFilter = false;

private ReadBuilder(InputFile file) {
this.file = file;
Expand Down Expand Up @@ -1020,6 +1021,11 @@ public ReadBuilder filter(Expression newFilter) {
return this;
}

public ReadBuilder useColumnIndexFilter(boolean newUseColumnIndexFilter) {
this.useColumnIndexFilter = newUseColumnIndexFilter;
return this;
}

public ReadBuilder readSupport(ReadSupport<?> newFilterSupport) {
this.readSupport = newFilterSupport;
return this;
Expand Down Expand Up @@ -1139,7 +1145,8 @@ public <D> CloseableIterable<D> build() {
nameMapping,
filter,
reuseContainers,
caseSensitive);
caseSensitive,
useColumnIndexFilter);
}
}

Expand Down
Loading