Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,15 @@ protected BaseBatchReader(List<VectorizedReader<?>> readers) {
@Override
public void setRowGroupInfo(
PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData, long rowPosition) {
setRowGroupInfo(pageStore, metaData);
}

@Override
public void setRowGroupInfo(
PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData) {
for (VectorizedArrowReader reader : readers) {
if (reader != null) {
reader.setRowGroupInfo(pageStore, metaData, rowPosition);
reader.setRowGroupInfo(pageStore, metaData);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,11 @@ private void allocateVectorBasedOnTypeName(PrimitiveType primitive, Field arrowF
@Override
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {
setRowGroupInfo(source, metadata);
}

@Override
public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {
ColumnChunkMetaData chunkMetaData = metadata.get(ColumnPath.get(columnDescriptor.getPath()));
this.dictionary =
vectorizedColumnIterator.setRowGroupInfo(
Expand Down Expand Up @@ -475,6 +480,10 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {}

@Override
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {}

@Override
public String toString() {
return "NullReader";
Expand Down Expand Up @@ -541,7 +550,13 @@ private static NullabilityHolder newNullabilityHolder(int size) {
@Override
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {
this.rowStart = rowPosition;
setRowGroupInfo(source, metadata);
}

@Override
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {
this.rowStart = source.getRowIndexOffset().orElse(0L);
}

@Override
Expand Down Expand Up @@ -586,6 +601,10 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {}

@Override
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {}

@Override
public String toString() {
return String.format("ConstantReader: %s", value);
Expand Down Expand Up @@ -613,6 +632,10 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {}

@Override
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {}

@Override
public String toString() {
return "DeletedVectorReader";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.parquet;

import java.util.Optional;
import java.util.PrimitiveIterator;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.page.DataPage;
Expand All @@ -26,6 +28,7 @@
@SuppressWarnings("checkstyle:VisibilityModifier")
public abstract class BaseColumnIterator {
protected final ColumnDescriptor desc;
protected final int definitionLevel;

// state reset for each row group
protected PageReader pageSource = null;
Expand All @@ -34,15 +37,31 @@ public abstract class BaseColumnIterator {
protected long advanceNextPageCount = 0L;
protected Dictionary dictionary;

// state for page skipping
protected boolean needsSynchronizing;
protected PrimitiveIterator.OfLong rowIndexes;
protected long currentRowIndex;
protected int numValuesToSkip;

protected BaseColumnIterator(ColumnDescriptor descriptor) {
this.desc = descriptor;
this.definitionLevel = desc.getMaxDefinitionLevel() - 1;
}

public void setPageSource(PageReader source) {
setPageSource(source, Optional.empty());
}

public void setPageSource(
PageReader source, Optional<PrimitiveIterator.OfLong> optionalRowIndexes) {
this.pageSource = source;
this.triplesCount = source.getTotalValueCount();
this.triplesRead = 0L;
this.advanceNextPageCount = 0L;
this.needsSynchronizing = optionalRowIndexes.isPresent();
if (needsSynchronizing) {
this.rowIndexes = optionalRowIndexes.get();
}
BasePageIterator pageIterator = pageIterator();
pageIterator.reset();
dictionary = ParquetUtil.readDictionary(desc, pageSource);
Expand All @@ -60,6 +79,17 @@ protected void advance() {
if (page != null) {
pageIterator.setPage(page);
this.advanceNextPageCount += pageIterator.currentPageCount();

if (needsSynchronizing) {
long firstRowIndex =
page.getFirstRowIndex()
.orElseThrow(
() ->
new IllegalStateException(
"Index of first row in page is not available"));
this.numValuesToSkip = 0;
this.currentRowIndex = firstRowIndex - 1;
}
} else {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ protected void reset() {
this.hasNext = false;
}

protected void skip(int numValuesToSkip) {
throw new UnsupportedOperationException();
}

protected abstract void initDataReader(
Encoding dataEncoding, ByteBufferInputStream in, int valueCount);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
*/
package org.apache.iceberg.parquet;

import java.util.Optional;
import java.util.PrimitiveIterator;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.io.api.Binary;

public abstract class ColumnIterator<T> extends BaseColumnIterator implements TripleIterator<T> {
Expand Down Expand Up @@ -89,6 +92,7 @@ public Binary next() {
}

private final PageIterator<T> pageIterator;
private long targetRowIndex = Long.MIN_VALUE;

private ColumnIterator(ColumnDescriptor desc, String writerVersion) {
super(desc);
Expand Down Expand Up @@ -160,4 +164,49 @@ public <N> N nextNull() {
protected BasePageIterator pageIterator() {
return pageIterator;
}

@Override
public void setPageSource(PageReader source) {
setPageSource(source, Optional.empty());
}

@Override
public void setPageSource(PageReader source, Optional<PrimitiveIterator.OfLong> rowIndexes) {
super.setPageSource(source, rowIndexes);
if (rowIndexes.isPresent()) {
this.targetRowIndex = Long.MIN_VALUE;
}
}

@Override
public boolean needsSynchronizing() {
return needsSynchronizing;
}

@Override
public void synchronize() {
numValuesToSkip = 0;
while (hasNext()) {
advance();
if (pageIterator.currentRepetitionLevel() == 0) {
currentRowIndex += 1;
if (currentRowIndex > targetRowIndex) {
targetRowIndex = rowIndexes.hasNext() ? rowIndexes.nextLong() : Long.MAX_VALUE;
}
}

if (currentRowIndex < targetRowIndex) {
triplesRead += 1;
if (pageIterator.currentDefinitionLevel() > definitionLevel) {
numValuesToSkip += 1;
}

pageIterator.advance();
} else {
break;
}
}

pageIterator.skip(numValuesToSkip);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public <V> V nextNull() {
return null;
}

private void advance() {
protected void advance() {
if (triplesRead < triplesCount) {
this.currentDL = definitionLevels.nextInt();
this.currentRL = repetitionLevels.nextInt();
Expand All @@ -202,6 +202,11 @@ private void advance() {
}
}

@Override
protected void skip(int numValuesToSkip) {
values.skip(numValuesToSkip);
}

RuntimeException handleRuntimeException(RuntimeException exception) {
if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, valueEncoding)
&& exception instanceof ArrayIndexOutOfBoundsException) {
Expand Down
49 changes: 40 additions & 9 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetOutputFormat;
Expand All @@ -114,10 +115,14 @@
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Parquet {
private Parquet() {}

private static final Logger LOG = LoggerFactory.getLogger(Parquet.class);

private static final Collection<String> READ_PROPERTIES_TO_REMOVE =
Sets.newHashSet(
"parquet.read.filter",
Expand Down Expand Up @@ -1165,7 +1170,27 @@ public <D> CloseableIterable<D> build() {
optionsBuilder.withDecryption(fileDecryptionProperties);
}

// TODO: for now, apply filter only for non-vectorized read
if (filter != null && batchedReaderFunc == null) {
MessageType type = getSchemaFromFile(fileDecryptionProperties);
Schema fileSchema = ParquetSchemaUtil.convert(type);
try {
FilterCompat.Filter convertedFilter =
ParquetFilters.convert(type, fileSchema, filter, caseSensitive);
optionsBuilder.useRecordFilter();
optionsBuilder.withRecordFilter(convertedFilter);
} catch (Exception e) {
// no record filter to use
optionsBuilder.useRecordFilter(false);
LOG.warn("ReadBuilder: filter conversion threw exception", e);
}
}

ParquetReadOptions options = optionsBuilder.build();
LOG.info(
"ReadBuilder: options: useRecordFilter: {}, recordFilter: {}",
options.useRecordFilter(),
options.getRecordFilter());

NameMapping mapping;
if (nameMapping != null) {
Expand Down Expand Up @@ -1218,15 +1243,7 @@ public <D> CloseableIterable<D> build() {
if (filter != null) {
// TODO: should not need to get the schema to push down before opening the file.
// Parquet should allow setting a filter inside its read support
ParquetReadOptions decryptOptions =
ParquetReadOptions.builder().withDecryption(fileDecryptionProperties).build();
MessageType type;
try (ParquetFileReader schemaReader =
ParquetFileReader.open(ParquetIO.file(file), decryptOptions)) {
type = schemaReader.getFileMetaData().getSchema();
} catch (IOException e) {
throw new RuntimeIOException(e);
}
MessageType type = getSchemaFromFile(fileDecryptionProperties);
Schema fileSchema = ParquetSchemaUtil.convert(type);
builder
.useStatsFilter()
Expand Down Expand Up @@ -1261,6 +1278,20 @@ public <D> CloseableIterable<D> build() {

return new ParquetIterable<>(builder);
}

private MessageType getSchemaFromFile(FileDecryptionProperties fileDecryptionProperties) {
ParquetReadOptions decryptOptions =
ParquetReadOptions.builder().withDecryption(fileDecryptionProperties).build();
MessageType type;
try (ParquetFileReader schemaReader =
ParquetFileReader.open(ParquetIO.file(file), decryptOptions)) {
type = schemaReader.getFileMetaData().getSchema();
} catch (IOException e) {
throw new RuntimeIOException(e);
}

return type;
}
}

private static class ParquetReadBuilder<T> extends ParquetReader.Builder<T> {
Expand Down
Loading