diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java index 2175293ab2b6..daa116f292ed 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java @@ -42,9 +42,15 @@ protected BaseBatchReader(List> readers) { @Override public void setRowGroupInfo( PageReadStore pageStore, Map metaData, long rowPosition) { + setRowGroupInfo(pageStore, metaData); + } + + @Override + public void setRowGroupInfo( + PageReadStore pageStore, Map metaData) { for (VectorizedArrowReader reader : readers) { if (reader != null) { - reader.setRowGroupInfo(pageStore, metaData, rowPosition); + reader.setRowGroupInfo(pageStore, metaData); } } } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index 27ee25124f16..411f241e169f 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -432,6 +432,11 @@ private void allocateVectorBasedOnTypeName(PrimitiveType primitive, Field arrowF @Override public void setRowGroupInfo( PageReadStore source, Map metadata, long rowPosition) { + setRowGroupInfo(source, metadata); + } + + @Override + public void setRowGroupInfo(PageReadStore source, Map metadata) { ColumnChunkMetaData chunkMetaData = metadata.get(ColumnPath.get(columnDescriptor.getPath())); this.dictionary = vectorizedColumnIterator.setRowGroupInfo( @@ -475,6 +480,10 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) { public void setRowGroupInfo( PageReadStore source, Map metadata, long rowPosition) {} + @Override + public void setRowGroupInfo( + PageReadStore source, Map metadata) {} + @Override public String toString() { return "NullReader"; @@ -541,7 +550,19 @@ private static NullabilityHolder newNullabilityHolder(int size) { @Override public void setRowGroupInfo( PageReadStore source, Map metadata, long rowPosition) { - this.rowStart = rowPosition; + setRowGroupInfo(source, metadata); + } + + @Override + public void setRowGroupInfo( + PageReadStore source, Map metadata) { + this.rowStart = + source + .getRowIndexOffset() + .orElseThrow( + () -> + new IllegalArgumentException( + "PageReadStore does not contain row index offset")); } @Override @@ -586,6 +607,10 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) { public void setRowGroupInfo( PageReadStore source, Map metadata, long rowPosition) {} + @Override + public void setRowGroupInfo( + PageReadStore source, Map metadata) {} + @Override public String toString() { return String.format("ConstantReader: %s", value); @@ -613,6 +638,10 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) { public void setRowGroupInfo( PageReadStore source, Map metadata, long rowPosition) {} + @Override + public void setRowGroupInfo( + PageReadStore source, Map metadata) {} + @Override public String toString() { return "DeletedVectorReader"; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java index c1d8b0ccbbad..e8ee90fdebb7 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java @@ -99,7 +99,6 @@ private static class FileIterator implements CloseableIterator { private final ParquetValueReader model; private final long totalValues; private final boolean reuseContainers; - private final long[] rowGroupsStartRowPos; private int nextRowGroup = 0; private long nextRowGroupStart = 0; @@ -112,7 +111,6 @@ private static class FileIterator implements CloseableIterator { this.model = conf.model(); this.totalValues = conf.totalValues(); this.reuseContainers = conf.reuseContainers(); - this.rowGroupsStartRowPos = conf.startRowPositions(); } @Override @@ -149,11 +147,10 @@ private void advance() { throw new RuntimeIOException(e); } - long rowPosition = rowGroupsStartRowPos[nextRowGroup]; nextRowGroupStart += pages.getRowCount(); nextRowGroup += 1; - model.setPageSource(pages, rowPosition); + model.setPageSource(pages); } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java index b6c2b5b70303..a2ade5336621 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java @@ -28,5 +28,15 @@ public interface ParquetValueReader { List> columns(); + /** + * @deprecated since 1.8.0, will be removed in 1.9.0; use {@link #setPageSource(PageReadStore)} + * instead. + */ + @Deprecated void setPageSource(PageReadStore pageStore, long rowPosition); + + default void setPageSource(PageReadStore pageStore) { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement setPageSource(PageReadStore)"); + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index 62a49da25e56..b055a139fa59 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -113,6 +113,9 @@ public List> columns() { @Override public void setPageSource(PageReadStore pageStore, long rowPosition) {} + + @Override + public void setPageSource(PageReadStore pageStore) {} } static class ConstantReader implements ParquetValueReader { @@ -176,6 +179,9 @@ public List> columns() { @Override public void setPageSource(PageReadStore pageStore, long rowPosition) {} + + @Override + public void setPageSource(PageReadStore pageStore) {} } static class PositionReader implements ParquetValueReader { @@ -200,7 +206,18 @@ public List> columns() { @Override public void setPageSource(PageReadStore pageStore, long rowPosition) { - this.rowGroupStart = rowPosition; + setPageSource(pageStore); + } + + @Override + public void setPageSource(PageReadStore pageStore) { + this.rowGroupStart = + pageStore + .getRowIndexOffset() + .orElseThrow( + () -> + new IllegalArgumentException( + "PageReadStore does not contain row index offset")); this.rowOffset = -1; } } @@ -221,6 +238,11 @@ protected PrimitiveReader(ColumnDescriptor desc) { @Override public void setPageSource(PageReadStore pageStore, long rowPosition) { + setPageSource(pageStore); + } + + @Override + public void setPageSource(PageReadStore pageStore) { column.setPageSource(pageStore.getPageReader(desc)); } @@ -405,7 +427,12 @@ private static class OptionReader implements ParquetValueReader { @Override public void setPageSource(PageReadStore pageStore, long rowPosition) { - reader.setPageSource(pageStore, rowPosition); + setPageSource(pageStore); + } + + @Override + public void setPageSource(PageReadStore pageStore) { + reader.setPageSource(pageStore); } @Override @@ -450,7 +477,12 @@ protected RepeatedReader( @Override public void setPageSource(PageReadStore pageStore, long rowPosition) { - reader.setPageSource(pageStore, rowPosition); + setPageSource(pageStore); + } + + @Override + public void setPageSource(PageReadStore pageStore) { + reader.setPageSource(pageStore); } @Override @@ -569,8 +601,13 @@ protected RepeatedKeyValueReader( @Override public void setPageSource(PageReadStore pageStore, long rowPosition) { - keyReader.setPageSource(pageStore, rowPosition); - valueReader.setPageSource(pageStore, rowPosition); + setPageSource(pageStore); + } + + @Override + public void setPageSource(PageReadStore pageStore) { + keyReader.setPageSource(pageStore); + valueReader.setPageSource(pageStore); } @Override @@ -720,8 +757,13 @@ protected StructReader(List types, List> readers) { @Override public final void setPageSource(PageReadStore pageStore, long rowPosition) { + setPageSource(pageStore); + } + + @Override + public final void setPageSource(PageReadStore pageStore) { for (ParquetValueReader reader : readers) { - reader.setPageSource(pageStore, rowPosition); + reader.setPageSource(pageStore); } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java index da91e4dfa56a..1fb2372ba568 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java @@ -19,13 +19,11 @@ package org.apache.iceberg.parquet; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; @@ -33,9 +31,7 @@ import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.parquet.ParquetReadOptions; -import org.apache.parquet.crypto.FileDecryptionProperties; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -59,7 +55,6 @@ class ReadConf { private final long totalValues; private final boolean reuseContainers; private final Integer batchSize; - private final long[] startRowPositions; // List of column chunk metadata for each row group private final List> columnChunkMetaDataForRowGroups; @@ -95,10 +90,6 @@ class ReadConf { this.rowGroups = reader.getRowGroups(); this.shouldSkip = new boolean[rowGroups.size()]; - this.startRowPositions = new long[rowGroups.size()]; - - // Fetch all row groups starting positions to compute the row offsets of the filtered row groups - Map offsetToStartPos = generateOffsetToStartPos(expectedSchema); ParquetMetricsRowGroupFilter statsFilter = null; ParquetDictionaryRowGroupFilter dictFilter = null; @@ -112,8 +103,6 @@ class ReadConf { long computedTotalValues = 0L; for (int i = 0; i < shouldSkip.length; i += 1) { BlockMetaData rowGroup = rowGroups.get(i); - startRowPositions[i] = - offsetToStartPos == null ? 0 : offsetToStartPos.get(rowGroup.getStartingPos()); boolean shouldRead = filter == null || (statsFilter.shouldRead(typeWithIds, rowGroup) @@ -155,7 +144,6 @@ private ReadConf(ReadConf toCopy) { this.batchSize = toCopy.batchSize; this.vectorizedModel = toCopy.vectorizedModel; this.columnChunkMetaDataForRowGroups = toCopy.columnChunkMetaDataForRowGroups; - this.startRowPositions = toCopy.startRowPositions; } ParquetFileReader reader() { @@ -181,38 +169,6 @@ boolean[] shouldSkip() { return shouldSkip; } - private Map generateOffsetToStartPos(Schema schema) { - if (schema.findField(MetadataColumns.ROW_POSITION.fieldId()) == null) { - return null; - } - - FileDecryptionProperties decryptionProperties = - (options == null) ? null : options.getDecryptionProperties(); - - ParquetReadOptions readOptions = - ParquetReadOptions.builder().withDecryption(decryptionProperties).build(); - - try (ParquetFileReader fileReader = newReader(file, readOptions)) { - Map offsetToStartPos = Maps.newHashMap(); - - long curRowCount = 0; - for (int i = 0; i < fileReader.getRowGroups().size(); i += 1) { - BlockMetaData meta = fileReader.getRowGroups().get(i); - offsetToStartPos.put(meta.getStartingPos(), curRowCount); - curRowCount += meta.getRowCount(); - } - - return offsetToStartPos; - - } catch (IOException e) { - throw new UncheckedIOException("Failed to create/close reader for file: " + file, e); - } - } - - long[] startRowPositions() { - return startRowPositions; - } - long totalValues() { return totalValues; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java index 35d94f328d60..fc10a57ec0e0 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java @@ -113,7 +113,6 @@ private static class FileIterator implements CloseableIterator { private long nextRowGroupStart = 0; private long valuesRead = 0; private T last = null; - private final long[] rowGroupsStartRowPos; FileIterator(ReadConf conf) { this.reader = conf.reader(); @@ -124,7 +123,6 @@ private static class FileIterator implements CloseableIterator { this.batchSize = conf.batchSize(); this.model.setBatchSize(this.batchSize); this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups(); - this.rowGroupsStartRowPos = conf.startRowPositions(); } @Override @@ -165,8 +163,7 @@ private void advance() { throw new RuntimeIOException(e); } - long rowPosition = rowGroupsStartRowPos[nextRowGroup]; - model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup), rowPosition); + model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup)); nextRowGroupStart += pages.getRowCount(); nextRowGroup += 1; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java index 72b1e39e9634..caf2b6ff22e8 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java @@ -43,10 +43,25 @@ public interface VectorizedReader { * @param pages row group information for all the columns * @param metadata map of {@link ColumnPath} -> {@link ColumnChunkMetaData} for the row group * @param rowPosition the row group's row offset in the parquet file + * @deprecated since 1.8.0, will be removed in 1.9.0; use {@link #setRowGroupInfo(PageReadStore, + * Map)} instead. */ + @Deprecated void setRowGroupInfo( PageReadStore pages, Map metadata, long rowPosition); + /** + * Sets the row group information to be used with this reader + * + * @param pages row group information for all the columns + * @param metadata map of {@link ColumnPath} -> {@link ColumnChunkMetaData} for the row group + */ + default void setRowGroupInfo(PageReadStore pages, Map metadata) { + throw new UnsupportedOperationException( + this.getClass().getName() + + " doesn't implement setRowGroupInfo(PageReadStore, Map)"); + } + /** Release any resources allocated. */ void close(); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index f07d8c545e35..77cb2ff771c8 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -55,8 +55,20 @@ public ColumnarBatchReader(List> readers) { @Override public void setRowGroupInfo( PageReadStore pageStore, Map metaData, long rowPosition) { - super.setRowGroupInfo(pageStore, metaData, rowPosition); - this.rowStartPosInBatch = rowPosition; + setRowGroupInfo(pageStore, metaData); + } + + @Override + public void setRowGroupInfo( + PageReadStore pageStore, Map metaData) { + super.setRowGroupInfo(pageStore, metaData); + this.rowStartPosInBatch = + pageStore + .getRowIndexOffset() + .orElseThrow( + () -> + new IllegalArgumentException( + "PageReadStore does not contain row index offset")); } public void setDeleteFilter(DeleteFilter deleteFilter) { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index f07d8c545e35..77cb2ff771c8 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -55,8 +55,20 @@ public ColumnarBatchReader(List> readers) { @Override public void setRowGroupInfo( PageReadStore pageStore, Map metaData, long rowPosition) { - super.setRowGroupInfo(pageStore, metaData, rowPosition); - this.rowStartPosInBatch = rowPosition; + setRowGroupInfo(pageStore, metaData); + } + + @Override + public void setRowGroupInfo( + PageReadStore pageStore, Map metaData) { + super.setRowGroupInfo(pageStore, metaData); + this.rowStartPosInBatch = + pageStore + .getRowIndexOffset() + .orElseThrow( + () -> + new IllegalArgumentException( + "PageReadStore does not contain row index offset")); } public void setDeleteFilter(DeleteFilter deleteFilter) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index f07d8c545e35..77cb2ff771c8 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -55,8 +55,20 @@ public ColumnarBatchReader(List> readers) { @Override public void setRowGroupInfo( PageReadStore pageStore, Map metaData, long rowPosition) { - super.setRowGroupInfo(pageStore, metaData, rowPosition); - this.rowStartPosInBatch = rowPosition; + setRowGroupInfo(pageStore, metaData); + } + + @Override + public void setRowGroupInfo( + PageReadStore pageStore, Map metaData) { + super.setRowGroupInfo(pageStore, metaData); + this.rowStartPosInBatch = + pageStore + .getRowIndexOffset() + .orElseThrow( + () -> + new IllegalArgumentException( + "PageReadStore does not contain row index offset")); } public void setDeleteFilter(DeleteFilter deleteFilter) {