Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ protected BaseBatchReader(List<VectorizedReader<?>> readers) {

@Override
public void setRowGroupInfo(
PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData, long rowPosition) {
PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData) {
for (VectorizedArrowReader reader : readers) {
if (reader != null) {
reader.setRowGroupInfo(pageStore, metaData, rowPosition);
reader.setRowGroupInfo(pageStore, metaData);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,7 @@ private void allocateVectorBasedOnTypeName(PrimitiveType primitive, Field arrowF
}

@Override
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {
public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {
ColumnChunkMetaData chunkMetaData = metadata.get(ColumnPath.get(columnDescriptor.getPath()));
this.dictionary =
vectorizedColumnIterator.setRowGroupInfo(
Expand Down Expand Up @@ -436,7 +435,7 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {

@Override
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {}
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {}

@Override
public String toString() {
Expand Down Expand Up @@ -502,8 +501,8 @@ private static NullabilityHolder newNullabilityHolder(int size) {

@Override
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {
this.rowStart = rowPosition;
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {
this.rowStart = source.getRowIndexOffset().orElse(0L);
}

@Override
Expand Down Expand Up @@ -545,7 +544,7 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {

@Override
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {}
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {}

@Override
public String toString() {
Expand All @@ -570,7 +569,7 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {

@Override
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {}
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {}

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ private static class FileIterator<T> implements CloseableIterator<T> {
private final ParquetValueReader<T> model;
private final long totalValues;
private final boolean reuseContainers;
private final long[] rowGroupsStartRowPos;

private int nextRowGroup = 0;
private long nextRowGroupStart = 0;
Expand All @@ -112,7 +111,6 @@ private static class FileIterator<T> implements CloseableIterator<T> {
this.model = conf.model();
this.totalValues = conf.totalValues();
this.reuseContainers = conf.reuseContainers();
this.rowGroupsStartRowPos = conf.startRowPositions();
}

@Override
Expand Down Expand Up @@ -149,11 +147,10 @@ private void advance() {
throw new RuntimeIOException(e);
}

long rowPosition = rowGroupsStartRowPos[nextRowGroup];
nextRowGroupStart += pages.getRowCount();
nextRowGroup += 1;

model.setPageSource(pages, rowPosition);
model.setPageSource(pages);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ public interface ParquetValueReader<T> {

List<TripleIterator<?>> columns();

void setPageSource(PageReadStore pageStore, long rowPosition);
void setPageSource(PageReadStore pageStore);
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public List<TripleIterator<?>> columns() {
}

@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {}
public void setPageSource(PageReadStore pageStore) {}
}

static class ConstantReader<C> implements ParquetValueReader<C> {
Expand All @@ -134,7 +134,7 @@ public List<TripleIterator<?>> columns() {
}

@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {}
public void setPageSource(PageReadStore pageStore) {}
}

static class PositionReader implements ParquetValueReader<Long> {
Expand All @@ -158,8 +158,8 @@ public List<TripleIterator<?>> columns() {
}

@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {
this.rowGroupStart = rowPosition;
public void setPageSource(PageReadStore pageStore) {
this.rowGroupStart = pageStore.getRowIndexOffset().orElse(0L);
this.rowOffset = -1;
}
}
Expand All @@ -179,7 +179,7 @@ protected PrimitiveReader(ColumnDescriptor desc) {
}

@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {
public void setPageSource(PageReadStore pageStore) {
column.setPageSource(pageStore.getPageReader(desc));
}

Expand Down Expand Up @@ -363,8 +363,8 @@ private static class OptionReader<T> implements ParquetValueReader<T> {
}

@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {
reader.setPageSource(pageStore, rowPosition);
public void setPageSource(PageReadStore pageStore) {
reader.setPageSource(pageStore);
}

@Override
Expand Down Expand Up @@ -408,8 +408,8 @@ protected RepeatedReader(
}

@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {
reader.setPageSource(pageStore, rowPosition);
public void setPageSource(PageReadStore pageStore) {
reader.setPageSource(pageStore);
}

@Override
Expand Down Expand Up @@ -527,9 +527,9 @@ protected RepeatedKeyValueReader(
}

@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {
keyReader.setPageSource(pageStore, rowPosition);
valueReader.setPageSource(pageStore, rowPosition);
public void setPageSource(PageReadStore pageStore) {
keyReader.setPageSource(pageStore);
valueReader.setPageSource(pageStore);
}

@Override
Expand Down Expand Up @@ -685,9 +685,9 @@ protected StructReader(List<Type> types, List<ParquetValueReader<?>> readers) {
}

@Override
public final void setPageSource(PageReadStore pageStore, long rowPosition) {
public final void setPageSource(PageReadStore pageStore) {
for (int i = 0; i < readers.length; i += 1) {
readers[i].setPageSource(pageStore, rowPosition);
readers[i].setPageSource(pageStore);
}
}

Expand Down
37 changes: 0 additions & 37 deletions parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,18 @@
package org.apache.iceberg.parquet;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
Expand All @@ -58,7 +55,6 @@ class ReadConf<T> {
private final long totalValues;
private final boolean reuseContainers;
private final Integer batchSize;
private final long[] startRowPositions;

// List of column chunk metadata for each row group
private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetaDataForRowGroups;
Expand Down Expand Up @@ -94,10 +90,6 @@ class ReadConf<T> {

this.rowGroups = reader.getRowGroups();
this.shouldSkip = new boolean[rowGroups.size()];
this.startRowPositions = new long[rowGroups.size()];

// Fetch all row groups starting positions to compute the row offsets of the filtered row groups
Map<Long, Long> offsetToStartPos = generateOffsetToStartPos(expectedSchema);

ParquetMetricsRowGroupFilter statsFilter = null;
ParquetDictionaryRowGroupFilter dictFilter = null;
Expand All @@ -111,8 +103,6 @@ class ReadConf<T> {
long computedTotalValues = 0L;
for (int i = 0; i < shouldSkip.length; i += 1) {
BlockMetaData rowGroup = rowGroups.get(i);
startRowPositions[i] =
offsetToStartPos == null ? 0 : offsetToStartPos.get(rowGroup.getStartingPos());
boolean shouldRead =
filter == null
|| (statsFilter.shouldRead(typeWithIds, rowGroup)
Expand Down Expand Up @@ -154,7 +144,6 @@ private ReadConf(ReadConf<T> toCopy) {
this.batchSize = toCopy.batchSize;
this.vectorizedModel = toCopy.vectorizedModel;
this.columnChunkMetaDataForRowGroups = toCopy.columnChunkMetaDataForRowGroups;
this.startRowPositions = toCopy.startRowPositions;
}

ParquetFileReader reader() {
Expand All @@ -180,32 +169,6 @@ boolean[] shouldSkip() {
return shouldSkip;
}

private Map<Long, Long> generateOffsetToStartPos(Schema schema) {
if (schema.findField(MetadataColumns.ROW_POSITION.fieldId()) == null) {
return null;
}

try (ParquetFileReader fileReader = newReader(file, ParquetReadOptions.builder().build())) {
Map<Long, Long> offsetToStartPos = Maps.newHashMap();

long curRowCount = 0;
for (int i = 0; i < fileReader.getRowGroups().size(); i += 1) {
BlockMetaData meta = fileReader.getRowGroups().get(i);
offsetToStartPos.put(meta.getStartingPos(), curRowCount);
curRowCount += meta.getRowCount();
}

return offsetToStartPos;

} catch (IOException e) {
throw new UncheckedIOException("Failed to create/close reader for file: " + file, e);
}
}

long[] startRowPositions() {
return startRowPositions;
}

long totalValues() {
return totalValues;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ private static class FileIterator<T> implements CloseableIterator<T> {
private long nextRowGroupStart = 0;
private long valuesRead = 0;
private T last = null;
private final long[] rowGroupsStartRowPos;

FileIterator(ReadConf conf) {
this.reader = conf.reader();
Expand All @@ -124,7 +123,6 @@ private static class FileIterator<T> implements CloseableIterator<T> {
this.batchSize = conf.batchSize();
this.model.setBatchSize(this.batchSize);
this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups();
this.rowGroupsStartRowPos = conf.startRowPositions();
}

@Override
Expand Down Expand Up @@ -165,8 +163,7 @@ private void advance() {
throw new RuntimeIOException(e);
}

long rowPosition = rowGroupsStartRowPos[nextRowGroup];
model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup), rowPosition);
model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup));
nextRowGroupStart += pages.getRowCount();
nextRowGroup += 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ public interface VectorizedReader<T> {
*
* @param pages row group information for all the columns
* @param metadata map of {@link ColumnPath} -&gt; {@link ColumnChunkMetaData} for the row group
* @param rowPosition the row group's row offset in the parquet file
*/
void setRowGroupInfo(
PageReadStore pages, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition);
void setRowGroupInfo(PageReadStore pages, Map<ColumnPath, ColumnChunkMetaData> metadata);

/** Release any resources allocated. */
void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ public ColumnarBatchReader(List<VectorizedReader<?>> readers) {

@Override
public void setRowGroupInfo(
PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData, long rowPosition) {
super.setRowGroupInfo(pageStore, metaData, rowPosition);
this.rowStartPosInBatch = rowPosition;
PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData) {
super.setRowGroupInfo(pageStore, metaData);
this.rowStartPosInBatch = pageStore.getRowIndexOffset().orElse(0L);
}

public void setDeleteFilter(DeleteFilter<InternalRow> deleteFilter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public ColumnarBatchReader(List<VectorizedReader<?>> readers) {

@Override
public void setRowGroupInfo(
PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData, long rowPosition) {
super.setRowGroupInfo(pageStore, metaData, rowPosition);
this.rowStartPosInBatch = rowPosition;
PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData) {
super.setRowGroupInfo(pageStore, metaData);
this.rowStartPosInBatch = pageStore.getRowIndexOffset().orElse(0L);
}

public void setDeleteFilter(DeleteFilter<InternalRow> deleteFilter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public ColumnarBatchReader(List<VectorizedReader<?>> readers) {

@Override
public void setRowGroupInfo(
PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData, long rowPosition) {
super.setRowGroupInfo(pageStore, metaData, rowPosition);
this.rowStartPosInBatch = rowPosition;
PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData) {
super.setRowGroupInfo(pageStore, metaData);
this.rowStartPosInBatch = pageStore.getRowIndexOffset().orElse(0L);
}

public void setDeleteFilter(DeleteFilter<InternalRow> deleteFilter) {
Expand Down