diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java index 2175293ab2b6..3222afeb53ff 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java @@ -41,10 +41,10 @@ protected BaseBatchReader(List> readers) { @Override public void setRowGroupInfo( - PageReadStore pageStore, Map metaData, long rowPosition) { + PageReadStore pageStore, Map metaData) { for (VectorizedArrowReader reader : readers) { if (reader != null) { - reader.setRowGroupInfo(pageStore, metaData, rowPosition); + reader.setRowGroupInfo(pageStore, metaData); } } } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index 79cbfb34bd54..e7828e4aefb8 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -393,8 +393,7 @@ private void allocateVectorBasedOnTypeName(PrimitiveType primitive, Field arrowF } @Override - public void setRowGroupInfo( - PageReadStore source, Map metadata, long rowPosition) { + public void setRowGroupInfo(PageReadStore source, Map metadata) { ColumnChunkMetaData chunkMetaData = metadata.get(ColumnPath.get(columnDescriptor.getPath())); this.dictionary = vectorizedColumnIterator.setRowGroupInfo( @@ -436,7 +435,7 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) { @Override public void setRowGroupInfo( - PageReadStore source, Map metadata, long rowPosition) {} + PageReadStore source, Map metadata) {} @Override public String toString() { @@ -502,8 +501,8 @@ private static NullabilityHolder newNullabilityHolder(int size) { @Override public void setRowGroupInfo( - PageReadStore source, Map metadata, long rowPosition) { - this.rowStart = rowPosition; + PageReadStore source, Map metadata) { + this.rowStart = source.getRowIndexOffset().orElse(0L); } @Override @@ -545,7 +544,7 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) { @Override public void setRowGroupInfo( - PageReadStore source, Map metadata, long rowPosition) {} + PageReadStore source, Map metadata) {} @Override public String toString() { @@ -570,7 +569,7 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) { @Override public void setRowGroupInfo( - PageReadStore source, Map metadata, long rowPosition) {} + PageReadStore source, Map metadata) {} @Override public String toString() { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java index c1d8b0ccbbad..e8ee90fdebb7 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java @@ -99,7 +99,6 @@ private static class FileIterator implements CloseableIterator { private final ParquetValueReader model; private final long totalValues; private final boolean reuseContainers; - private final long[] rowGroupsStartRowPos; private int nextRowGroup = 0; private long nextRowGroupStart = 0; @@ -112,7 +111,6 @@ private static class FileIterator implements CloseableIterator { this.model = conf.model(); this.totalValues = conf.totalValues(); this.reuseContainers = conf.reuseContainers(); - this.rowGroupsStartRowPos = conf.startRowPositions(); } @Override @@ -149,11 +147,10 @@ private void advance() { throw new RuntimeIOException(e); } - long rowPosition = rowGroupsStartRowPos[nextRowGroup]; nextRowGroupStart += pages.getRowCount(); nextRowGroup += 1; - model.setPageSource(pages, rowPosition); + model.setPageSource(pages); } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java index b6c2b5b70303..b07941fc460c 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java @@ -28,5 +28,5 @@ public interface ParquetValueReader { List> columns(); - void setPageSource(PageReadStore pageStore, long rowPosition); + void setPageSource(PageReadStore pageStore); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index b995e1707112..5480ce1d0592 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -108,7 +108,7 @@ public List> columns() { } @Override - public void setPageSource(PageReadStore pageStore, long rowPosition) {} + public void setPageSource(PageReadStore pageStore) {} } static class ConstantReader implements ParquetValueReader { @@ -134,7 +134,7 @@ public List> columns() { } @Override - public void setPageSource(PageReadStore pageStore, long rowPosition) {} + public void setPageSource(PageReadStore pageStore) {} } static class PositionReader implements ParquetValueReader { @@ -158,8 +158,8 @@ public List> columns() { } @Override - public void setPageSource(PageReadStore pageStore, long rowPosition) { - this.rowGroupStart = rowPosition; + public void setPageSource(PageReadStore pageStore) { + this.rowGroupStart = pageStore.getRowIndexOffset().orElse(0L); this.rowOffset = -1; } } @@ -179,7 +179,7 @@ protected PrimitiveReader(ColumnDescriptor desc) { } @Override - public void setPageSource(PageReadStore pageStore, long rowPosition) { + public void setPageSource(PageReadStore pageStore) { column.setPageSource(pageStore.getPageReader(desc)); } @@ -363,8 +363,8 @@ private static class OptionReader implements ParquetValueReader { } @Override - public void setPageSource(PageReadStore pageStore, long rowPosition) { - reader.setPageSource(pageStore, rowPosition); + public void setPageSource(PageReadStore pageStore) { + reader.setPageSource(pageStore); } @Override @@ -408,8 +408,8 @@ protected RepeatedReader( } @Override - public void setPageSource(PageReadStore pageStore, long rowPosition) { - reader.setPageSource(pageStore, rowPosition); + public void setPageSource(PageReadStore pageStore) { + reader.setPageSource(pageStore); } @Override @@ -527,9 +527,9 @@ protected RepeatedKeyValueReader( } @Override - public void setPageSource(PageReadStore pageStore, long rowPosition) { - keyReader.setPageSource(pageStore, rowPosition); - valueReader.setPageSource(pageStore, rowPosition); + public void setPageSource(PageReadStore pageStore) { + keyReader.setPageSource(pageStore); + valueReader.setPageSource(pageStore); } @Override @@ -685,9 +685,9 @@ protected StructReader(List types, List> readers) { } @Override - public final void setPageSource(PageReadStore pageStore, long rowPosition) { + public final void setPageSource(PageReadStore pageStore) { for (int i = 0; i < readers.length; i += 1) { - readers[i].setPageSource(pageStore, rowPosition); + readers[i].setPageSource(pageStore); } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java index 7bb89a30f8e9..1fb2372ba568 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java @@ -19,13 +19,11 @@ package org.apache.iceberg.parquet; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; @@ -33,7 +31,6 @@ import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -58,7 +55,6 @@ class ReadConf { private final long totalValues; private final boolean reuseContainers; private final Integer batchSize; - private final long[] startRowPositions; // List of column chunk metadata for each row group private final List> columnChunkMetaDataForRowGroups; @@ -94,10 +90,6 @@ class ReadConf { this.rowGroups = reader.getRowGroups(); this.shouldSkip = new boolean[rowGroups.size()]; - this.startRowPositions = new long[rowGroups.size()]; - - // Fetch all row groups starting positions to compute the row offsets of the filtered row groups - Map offsetToStartPos = generateOffsetToStartPos(expectedSchema); ParquetMetricsRowGroupFilter statsFilter = null; ParquetDictionaryRowGroupFilter dictFilter = null; @@ -111,8 +103,6 @@ class ReadConf { long computedTotalValues = 0L; for (int i = 0; i < shouldSkip.length; i += 1) { BlockMetaData rowGroup = rowGroups.get(i); - startRowPositions[i] = - offsetToStartPos == null ? 0 : offsetToStartPos.get(rowGroup.getStartingPos()); boolean shouldRead = filter == null || (statsFilter.shouldRead(typeWithIds, rowGroup) @@ -154,7 +144,6 @@ private ReadConf(ReadConf toCopy) { this.batchSize = toCopy.batchSize; this.vectorizedModel = toCopy.vectorizedModel; this.columnChunkMetaDataForRowGroups = toCopy.columnChunkMetaDataForRowGroups; - this.startRowPositions = toCopy.startRowPositions; } ParquetFileReader reader() { @@ -180,32 +169,6 @@ boolean[] shouldSkip() { return shouldSkip; } - private Map generateOffsetToStartPos(Schema schema) { - if (schema.findField(MetadataColumns.ROW_POSITION.fieldId()) == null) { - return null; - } - - try (ParquetFileReader fileReader = newReader(file, ParquetReadOptions.builder().build())) { - Map offsetToStartPos = Maps.newHashMap(); - - long curRowCount = 0; - for (int i = 0; i < fileReader.getRowGroups().size(); i += 1) { - BlockMetaData meta = fileReader.getRowGroups().get(i); - offsetToStartPos.put(meta.getStartingPos(), curRowCount); - curRowCount += meta.getRowCount(); - } - - return offsetToStartPos; - - } catch (IOException e) { - throw new UncheckedIOException("Failed to create/close reader for file: " + file, e); - } - } - - long[] startRowPositions() { - return startRowPositions; - } - long totalValues() { return totalValues; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java index 773e0f7a85d0..8d99f36b4ceb 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java @@ -113,7 +113,6 @@ private static class FileIterator implements CloseableIterator { private long nextRowGroupStart = 0; private long valuesRead = 0; private T last = null; - private final long[] rowGroupsStartRowPos; FileIterator(ReadConf conf) { this.reader = conf.reader(); @@ -124,7 +123,6 @@ private static class FileIterator implements CloseableIterator { this.batchSize = conf.batchSize(); this.model.setBatchSize(this.batchSize); this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups(); - this.rowGroupsStartRowPos = conf.startRowPositions(); } @Override @@ -165,8 +163,7 @@ private void advance() { throw new RuntimeIOException(e); } - long rowPosition = rowGroupsStartRowPos[nextRowGroup]; - model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup), rowPosition); + model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup)); nextRowGroupStart += pages.getRowCount(); nextRowGroup += 1; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java index 72b1e39e9634..d44c4615e6e7 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java @@ -42,10 +42,8 @@ public interface VectorizedReader { * * @param pages row group information for all the columns * @param metadata map of {@link ColumnPath} -> {@link ColumnChunkMetaData} for the row group - * @param rowPosition the row group's row offset in the parquet file */ - void setRowGroupInfo( - PageReadStore pages, Map metadata, long rowPosition); + void setRowGroupInfo(PageReadStore pages, Map metadata); /** Release any resources allocated. */ void close(); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index 72b1345fa867..e9e70c21c2a5 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -49,9 +49,9 @@ public ColumnarBatchReader(List> readers) { @Override public void setRowGroupInfo( - PageReadStore pageStore, Map metaData, long rowPosition) { - super.setRowGroupInfo(pageStore, metaData, rowPosition); - this.rowStartPosInBatch = rowPosition; + PageReadStore pageStore, Map metaData) { + super.setRowGroupInfo(pageStore, metaData); + this.rowStartPosInBatch = pageStore.getRowIndexOffset().orElse(0L); } public void setDeleteFilter(DeleteFilter deleteFilter) { diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index f07d8c545e35..9282e03a620e 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -54,9 +54,9 @@ public ColumnarBatchReader(List> readers) { @Override public void setRowGroupInfo( - PageReadStore pageStore, Map metaData, long rowPosition) { - super.setRowGroupInfo(pageStore, metaData, rowPosition); - this.rowStartPosInBatch = rowPosition; + PageReadStore pageStore, Map metaData) { + super.setRowGroupInfo(pageStore, metaData); + this.rowStartPosInBatch = pageStore.getRowIndexOffset().orElse(0L); } public void setDeleteFilter(DeleteFilter deleteFilter) { diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index f07d8c545e35..9282e03a620e 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -54,9 +54,9 @@ public ColumnarBatchReader(List> readers) { @Override public void setRowGroupInfo( - PageReadStore pageStore, Map metaData, long rowPosition) { - super.setRowGroupInfo(pageStore, metaData, rowPosition); - this.rowStartPosInBatch = rowPosition; + PageReadStore pageStore, Map metaData) { + super.setRowGroupInfo(pageStore, metaData); + this.rowStartPosInBatch = pageStore.getRowIndexOffset().orElse(0L); } public void setDeleteFilter(DeleteFilter deleteFilter) {