Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,15 @@ protected BaseBatchReader(List<VectorizedReader<?>> readers) {
@Override
public void setRowGroupInfo(
PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData, long rowPosition) {
setRowGroupInfo(pageStore, metaData);
}

@Override
public void setRowGroupInfo(
PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData) {
for (VectorizedArrowReader reader : readers) {
if (reader != null) {
reader.setRowGroupInfo(pageStore, metaData, rowPosition);
reader.setRowGroupInfo(pageStore, metaData);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,11 @@ private void allocateVectorBasedOnTypeName(PrimitiveType primitive, Field arrowF
@Override
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {
setRowGroupInfo(source, metadata);
}

@Override
public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {
ColumnChunkMetaData chunkMetaData = metadata.get(ColumnPath.get(columnDescriptor.getPath()));
this.dictionary =
vectorizedColumnIterator.setRowGroupInfo(
Expand Down Expand Up @@ -475,6 +480,10 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {}

@Override
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {}

@Override
public String toString() {
return "NullReader";
Expand Down Expand Up @@ -541,7 +550,19 @@ private static NullabilityHolder newNullabilityHolder(int size) {
@Override
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {
this.rowStart = rowPosition;
setRowGroupInfo(source, metadata);
}

@Override
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {
this.rowStart =
source
.getRowIndexOffset()
.orElseThrow(
() ->
new IllegalArgumentException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is there a better Exception than IllegalArgumentException? Is IllegalStateException a bit better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see why you might consider IllegalStateException. I do think IllegalArgumentException is appropriate, because the PageReadStore is an argument to the method being called, and the problem is with the PageReadStore. I think IllegalStateException is typically used to indicate an internal inconsistency in the module.
Consider if we use Guava's Preconditions to check a condition here. The condition would be that source.getRowIndexOffset().isPresent(). The checkArgument methods throw IllegalArgumentException and "Ensures the truth of an expression involving one or more parameters to the calling method." The checkState methods throw IllegalStateException and "Ensures the truth of an expression involving the state of the calling instance, but not involving any parameters to the calling method." (my emphasis)
Of course, that just expresses the opinions of the authors of Guava. There are others who might argue that IllegalStateException is appropriate here, or neither IllegalStateException nor IllegalArgumentException. It really doesn't matter too much. I can just throw RuntimeException if you do not agree with IllegalArgumentException.

"PageReadStore does not contain row index offset"));
}

@Override
Expand Down Expand Up @@ -586,6 +607,10 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {}

@Override
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {}

@Override
public String toString() {
return String.format("ConstantReader: %s", value);
Expand Down Expand Up @@ -613,6 +638,10 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {}

@Override
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {}

@Override
public String toString() {
return "DeletedVectorReader";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ private static class FileIterator<T> implements CloseableIterator<T> {
private final ParquetValueReader<T> model;
private final long totalValues;
private final boolean reuseContainers;
private final long[] rowGroupsStartRowPos;

private int nextRowGroup = 0;
private long nextRowGroupStart = 0;
Expand All @@ -112,7 +111,6 @@ private static class FileIterator<T> implements CloseableIterator<T> {
this.model = conf.model();
this.totalValues = conf.totalValues();
this.reuseContainers = conf.reuseContainers();
this.rowGroupsStartRowPos = conf.startRowPositions();
}

@Override
Expand Down Expand Up @@ -149,11 +147,10 @@ private void advance() {
throw new RuntimeIOException(e);
}

long rowPosition = rowGroupsStartRowPos[nextRowGroup];
nextRowGroupStart += pages.getRowCount();
nextRowGroup += 1;

model.setPageSource(pages, rowPosition);
model.setPageSource(pages);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,15 @@ public interface ParquetValueReader<T> {

List<TripleIterator<?>> columns();

/**
* @deprecated since 1.8.0, will be removed in 1.9.0; use {@link #setPageSource(PageReadStore)}
* instead.
*/
@Deprecated
void setPageSource(PageReadStore pageStore, long rowPosition);

default void setPageSource(PageReadStore pageStore) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement setPageSource(PageReadStore)");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ public List<TripleIterator<?>> columns() {

@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {}

@Override
public void setPageSource(PageReadStore pageStore) {}
}

static class ConstantReader<C> implements ParquetValueReader<C> {
Expand Down Expand Up @@ -176,6 +179,9 @@ public List<TripleIterator<?>> columns() {

@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {}

@Override
public void setPageSource(PageReadStore pageStore) {}
}

static class PositionReader implements ParquetValueReader<Long> {
Expand All @@ -200,7 +206,18 @@ public List<TripleIterator<?>> columns() {

@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {
this.rowGroupStart = rowPosition;
setPageSource(pageStore);
}

@Override
public void setPageSource(PageReadStore pageStore) {
this.rowGroupStart =
pageStore
.getRowIndexOffset()
.orElseThrow(
() ->
new IllegalArgumentException(
"PageReadStore does not contain row index offset"));
this.rowOffset = -1;
}
}
Expand All @@ -221,6 +238,11 @@ protected PrimitiveReader(ColumnDescriptor desc) {

@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {
setPageSource(pageStore);
}

@Override
public void setPageSource(PageReadStore pageStore) {
column.setPageSource(pageStore.getPageReader(desc));
}

Expand Down Expand Up @@ -405,7 +427,12 @@ private static class OptionReader<T> implements ParquetValueReader<T> {

@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {
reader.setPageSource(pageStore, rowPosition);
setPageSource(pageStore);
}

@Override
public void setPageSource(PageReadStore pageStore) {
reader.setPageSource(pageStore);
}

@Override
Expand Down Expand Up @@ -450,7 +477,12 @@ protected RepeatedReader(

@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {
reader.setPageSource(pageStore, rowPosition);
setPageSource(pageStore);
}

@Override
public void setPageSource(PageReadStore pageStore) {
reader.setPageSource(pageStore);
}

@Override
Expand Down Expand Up @@ -569,8 +601,13 @@ protected RepeatedKeyValueReader(

@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {
keyReader.setPageSource(pageStore, rowPosition);
valueReader.setPageSource(pageStore, rowPosition);
setPageSource(pageStore);
}

@Override
public void setPageSource(PageReadStore pageStore) {
keyReader.setPageSource(pageStore);
valueReader.setPageSource(pageStore);
}

@Override
Expand Down Expand Up @@ -720,8 +757,13 @@ protected StructReader(List<Type> types, List<ParquetValueReader<?>> readers) {

@Override
public final void setPageSource(PageReadStore pageStore, long rowPosition) {
setPageSource(pageStore);
}

@Override
public final void setPageSource(PageReadStore pageStore) {
for (ParquetValueReader<?> reader : readers) {
reader.setPageSource(pageStore, rowPosition);
reader.setPageSource(pageStore);
}
}

Expand Down
44 changes: 0 additions & 44 deletions parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,19 @@
package org.apache.iceberg.parquet;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
Expand All @@ -59,7 +55,6 @@ class ReadConf<T> {
private final long totalValues;
private final boolean reuseContainers;
private final Integer batchSize;
private final long[] startRowPositions;

// List of column chunk metadata for each row group
private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetaDataForRowGroups;
Expand Down Expand Up @@ -95,10 +90,6 @@ class ReadConf<T> {

this.rowGroups = reader.getRowGroups();
this.shouldSkip = new boolean[rowGroups.size()];
this.startRowPositions = new long[rowGroups.size()];

// Fetch all row groups starting positions to compute the row offsets of the filtered row groups
Map<Long, Long> offsetToStartPos = generateOffsetToStartPos(expectedSchema);

ParquetMetricsRowGroupFilter statsFilter = null;
ParquetDictionaryRowGroupFilter dictFilter = null;
Expand All @@ -112,8 +103,6 @@ class ReadConf<T> {
long computedTotalValues = 0L;
for (int i = 0; i < shouldSkip.length; i += 1) {
BlockMetaData rowGroup = rowGroups.get(i);
startRowPositions[i] =
offsetToStartPos == null ? 0 : offsetToStartPos.get(rowGroup.getStartingPos());
boolean shouldRead =
filter == null
|| (statsFilter.shouldRead(typeWithIds, rowGroup)
Expand Down Expand Up @@ -155,7 +144,6 @@ private ReadConf(ReadConf<T> toCopy) {
this.batchSize = toCopy.batchSize;
this.vectorizedModel = toCopy.vectorizedModel;
this.columnChunkMetaDataForRowGroups = toCopy.columnChunkMetaDataForRowGroups;
this.startRowPositions = toCopy.startRowPositions;
}

ParquetFileReader reader() {
Expand All @@ -181,38 +169,6 @@ boolean[] shouldSkip() {
return shouldSkip;
}

private Map<Long, Long> generateOffsetToStartPos(Schema schema) {
if (schema.findField(MetadataColumns.ROW_POSITION.fieldId()) == null) {
return null;
}

FileDecryptionProperties decryptionProperties =
(options == null) ? null : options.getDecryptionProperties();

ParquetReadOptions readOptions =
ParquetReadOptions.builder().withDecryption(decryptionProperties).build();

try (ParquetFileReader fileReader = newReader(file, readOptions)) {
Map<Long, Long> offsetToStartPos = Maps.newHashMap();

long curRowCount = 0;
for (int i = 0; i < fileReader.getRowGroups().size(); i += 1) {
BlockMetaData meta = fileReader.getRowGroups().get(i);
offsetToStartPos.put(meta.getStartingPos(), curRowCount);
curRowCount += meta.getRowCount();
}

return offsetToStartPos;

} catch (IOException e) {
throw new UncheckedIOException("Failed to create/close reader for file: " + file, e);
}
}

long[] startRowPositions() {
return startRowPositions;
}

long totalValues() {
return totalValues;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ private static class FileIterator<T> implements CloseableIterator<T> {
private long nextRowGroupStart = 0;
private long valuesRead = 0;
private T last = null;
private final long[] rowGroupsStartRowPos;

FileIterator(ReadConf conf) {
this.reader = conf.reader();
Expand All @@ -124,7 +123,6 @@ private static class FileIterator<T> implements CloseableIterator<T> {
this.batchSize = conf.batchSize();
this.model.setBatchSize(this.batchSize);
this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups();
this.rowGroupsStartRowPos = conf.startRowPositions();
}

@Override
Expand Down Expand Up @@ -165,8 +163,7 @@ private void advance() {
throw new RuntimeIOException(e);
}

long rowPosition = rowGroupsStartRowPos[nextRowGroup];
model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup), rowPosition);
model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup));
nextRowGroupStart += pages.getRowCount();
nextRowGroup += 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,25 @@ public interface VectorizedReader<T> {
* @param pages row group information for all the columns
* @param metadata map of {@link ColumnPath} -&gt; {@link ColumnChunkMetaData} for the row group
* @param rowPosition the row group's row offset in the parquet file
* @deprecated since 1.8.0, will be removed in 1.9.0; use {@link #setRowGroupInfo(PageReadStore,
* Map)} instead.
*/
@Deprecated
void setRowGroupInfo(
PageReadStore pages, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition);

/**
* Sets the row group information to be used with this reader
*
* @param pages row group information for all the columns
* @param metadata map of {@link ColumnPath} -&gt; {@link ColumnChunkMetaData} for the row group
*/
default void setRowGroupInfo(PageReadStore pages, Map<ColumnPath, ColumnChunkMetaData> metadata) {
throw new UnsupportedOperationException(
this.getClass().getName()
+ " doesn't implement setRowGroupInfo(PageReadStore, Map<ColumnPath, ColumnChunkMetaData>)");
}

/** Release any resources allocated. */
void close();
}
Loading