From 74db80db82863236af811a324d02b618338a49be Mon Sep 17 00:00:00 2001 From: Lukasz Stec Date: Tue, 13 Dec 2022 12:03:19 +0100 Subject: [PATCH 1/6] Implement toString on ReferenceCountedReader Improves readability of exceptions that references ReferenceCountedReader in the message --- .../io/trino/parquet/AbstractParquetDataSource.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java b/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java index 350a88313f47..2b15ed653980 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static java.lang.Math.toIntExact; @@ -300,5 +301,14 @@ public void free() data = null; } } + + @Override + public String toString() + { + return toStringHelper(this) + .add("range", range) + .add("referenceCount", referenceCount) + .toString(); + } } } From c13474baf47eb096e8c50818bd3dc2ab279eb0b4 Mon Sep 17 00:00:00 2001 From: Lukasz Stec Date: Tue, 13 Dec 2022 14:44:14 +0100 Subject: [PATCH 2/6] Fix processValues javadoc formating --- .../java/io/trino/parquet/reader/PrimitiveColumnReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PrimitiveColumnReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PrimitiveColumnReader.java index 6d7c7262e336..264a3d377268 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PrimitiveColumnReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PrimitiveColumnReader.java @@ -204,7 +204,7 @@ private void skipValues(long valuesToRead) * 100 │ p5 │ │ │ * └──────┴──────┴──────┘ * - * + *

* The pages 1, 2, 3 in col1 are skipped so we have to skip the rows [20, 79]. Because page 1 in col2 contains values * only for the rows [40, 79] we skip this entire page as well. To synchronize the row reading we have to skip the * values (and the related rl and dl) for the rows [20, 39] in the end of the page 0 for col2. Similarly, we have to From cc18eb1e36041269022f39e3acdf44a25dc21f74 Mon Sep 17 00:00:00 2001 From: Lukasz Stec Date: Tue, 20 Dec 2022 15:36:09 +0100 Subject: [PATCH 3/6] Rename TestHdfsParquetDataSource to TestParquetDataSource --- ...estHdfsParquetDataSource.java => TestParquetDataSource.java} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/{TestHdfsParquetDataSource.java => TestParquetDataSource.java} (98%) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestHdfsParquetDataSource.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestParquetDataSource.java similarity index 98% rename from plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestHdfsParquetDataSource.java rename to plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestParquetDataSource.java index 802becf294f4..cd9bf448ee1c 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestHdfsParquetDataSource.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestParquetDataSource.java @@ -37,7 +37,7 @@ import static io.trino.plugin.hive.HiveTestUtils.SESSION; import static org.assertj.core.api.Assertions.assertThat; -public class TestHdfsParquetDataSource +public class TestParquetDataSource { @Test(dataProvider = "testPlanReadOrderingProvider") public void testPlanReadOrdering(DataSize maxBufferSize) From 5c64bce971db2d3be6251ef0757f1774efb62f4f Mon Sep 17 00:00:00 2001 From: Lukasz Stec Date: Tue, 20 Dec 2022 15:45:50 +0100 Subject: [PATCH 4/6] Move TestParquetDataSource to trino-parquet TestParquetDataSource tests only a generic AbstractParquetDataSource logic so it should be in trino-parquet. --- .../reader}/TestParquetDataSource.java | 20 ++------- .../reader/TestingParquetDataSource.java | 44 +++++++++++++++++++ 2 files changed, 48 insertions(+), 16 deletions(-) rename {plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet => lib/trino-parquet/src/test/java/io/trino/parquet/reader}/TestParquetDataSource.java (74%) create mode 100644 lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestingParquetDataSource.java diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestParquetDataSource.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java similarity index 74% rename from plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestParquetDataSource.java rename to lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java index cd9bf448ee1c..9b3105f46550 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestParquetDataSource.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.hive.parquet; +package io.trino.parquet.reader; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; @@ -19,22 +19,16 @@ import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.airlift.units.DataSize; -import io.trino.filesystem.TrinoFileSystem; import io.trino.parquet.ChunkReader; import io.trino.parquet.DiskRange; import io.trino.parquet.ParquetReaderOptions; -import io.trino.plugin.hive.FileFormatDataSourceStats; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.io.IOException; -import java.io.OutputStream; -import java.util.UUID; import java.util.stream.IntStream; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; -import static io.trino.plugin.hive.HiveTestUtils.SESSION; import static org.assertj.core.api.Assertions.assertThat; public class TestParquetDataSource @@ -44,15 +38,9 @@ public void testPlanReadOrdering(DataSize maxBufferSize) throws IOException { Slice testingInput = Slices.wrappedIntArray(IntStream.range(0, 1000).toArray()); - String path = "/tmp/" + UUID.randomUUID(); - TrinoFileSystem trinoFileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION); - try (OutputStream outputStream = trinoFileSystem.newOutputFile(path).create(newSimpleAggregatedMemoryContext())) { - outputStream.write(testingInput.getBytes()); - } - TrinoParquetDataSource dataSource = new TrinoParquetDataSource( - trinoFileSystem.newInputFile(path), - new ParquetReaderOptions().withMaxBufferSize(maxBufferSize), - new FileFormatDataSourceStats()); + TestingParquetDataSource dataSource = new TestingParquetDataSource( + testingInput, + new ParquetReaderOptions().withMaxBufferSize(maxBufferSize)); ListMultimap chunkReaders = dataSource.planRead(ImmutableListMultimap.builder() .putAll("test", new DiskRange(0, 300), new DiskRange(400, 100), new DiskRange(700, 200)) diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestingParquetDataSource.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestingParquetDataSource.java new file mode 100644 index 000000000000..1ae1fabdc322 --- /dev/null +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestingParquetDataSource.java @@ -0,0 +1,44 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader; + +import io.airlift.slice.Slice; +import io.trino.parquet.AbstractParquetDataSource; +import io.trino.parquet.ParquetDataSourceId; +import io.trino.parquet.ParquetReaderOptions; + +import java.io.IOException; +import java.util.UUID; + +import static java.lang.Math.toIntExact; + +public class TestingParquetDataSource + extends AbstractParquetDataSource +{ + private final Slice input; + + public TestingParquetDataSource(Slice slice, ParquetReaderOptions options) + throws IOException + { + super(new ParquetDataSourceId(UUID.randomUUID().toString()), slice.length(), options); + this.input = slice; + } + + @Override + protected void readInternal(long position, byte[] buffer, int bufferOffset, int bufferLength) + throws IOException + { + input.getBytes(toIntExact(position), buffer, bufferOffset, bufferLength); + } +} From 7be2201e46cc74953b206a2f8ca29eeb1a4a4a94 Mon Sep 17 00:00:00 2001 From: Lukasz Stec Date: Tue, 29 Nov 2022 08:24:05 +0100 Subject: [PATCH 5/6] Read parquet column chunks in small steps Before this change, parquet column chunks were read in one go, copying everything into one big Slice. This had two issues. One, for limit queries, we potentially don't need to read entire column chunk to finish the query as first page may satisfy the limit. Second, for files with big row group size the allocated Slice can exceed the jvm limits for native byte array, and even if it doesn't, it makes memory usage not efficient due to how humungous allocations are implemented in the jvm. --- .../parquet/AbstractParquetDataSource.java | 47 +- .../io/trino/parquet/ParquetDataSource.java | 3 +- .../parquet/reader/ChunkedInputStream.java | 151 ++++++ .../io/trino/parquet/reader/PageReader.java | 104 +++-- ...k.java => ParquetColumnChunkIterator.java} | 132 +++--- .../trino/parquet/reader/ParquetReader.java | 58 +-- .../parquet/reader/PrimitiveColumnReader.java | 6 - .../parquet/reader/TrinoColumnIndexStore.java | 22 +- .../parquet/reader/flat/FlatColumnReader.java | 1 - .../reader/AbstractColumnReaderBenchmark.java | 2 +- .../reader/TestChunkedInputStream.java | 153 +++++++ .../parquet/reader/TestColumnReader.java | 13 +- .../parquet/reader/TestInt96Timestamp.java | 3 +- .../trino/parquet/reader/TestPageReader.java | 431 ++++++++++++++++++ .../parquet/reader/TestParquetDataSource.java | 54 ++- .../parquet/reader/TestingChunkReader.java | 56 +++ .../reader/flat/TestFlatColumnReader.java | 20 +- .../hive/parquet/ParquetReaderConfig.java | 2 + .../plugin/hive/TestParquetPageSkipping.java | 3 +- ...arquetPageSkippingWithOptimizedReader.java | 3 +- 20 files changed, 1065 insertions(+), 199 deletions(-) create mode 100644 lib/trino-parquet/src/main/java/io/trino/parquet/reader/ChunkedInputStream.java rename lib/trino-parquet/src/main/java/io/trino/parquet/reader/{ParquetColumnChunk.java => ParquetColumnChunkIterator.java} (60%) create mode 100644 lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestChunkedInputStream.java create mode 100644 lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestPageReader.java create mode 100644 lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestingChunkReader.java diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java b/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java index 2b15ed653980..f8edc10eb24f 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java @@ -19,6 +19,8 @@ import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.airlift.units.DataSize; +import io.trino.memory.context.AggregatedMemoryContext; +import io.trino.memory.context.LocalMemoryContext; import java.io.IOException; import java.util.ArrayList; @@ -27,6 +29,7 @@ import java.util.Map; import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static java.lang.Math.toIntExact; @@ -117,7 +120,7 @@ private void readFully(long position, byte[] buffer, int bufferOffset, int buffe } @Override - public final ListMultimap planRead(ListMultimap diskRanges) + public final ListMultimap planRead(ListMultimap diskRanges, AggregatedMemoryContext memoryContext) { requireNonNull(diskRanges, "diskRanges is null"); @@ -137,7 +140,7 @@ public final ListMultimap planRead(ListMultimap smallRanges = smallRangesBuilder.build(); @@ -145,16 +148,36 @@ public final ListMultimap planRead(ListMultimap slices = ImmutableListMultimap.builder(); - slices.putAll(readSmallDiskRanges(smallRanges)); - slices.putAll(readLargeDiskRanges(largeRanges)); - // Re-order ChunkReaders by their DiskRange offsets as ParquetColumnChunk expects + slices.putAll(readSmallDiskRanges(smallRanges, memoryContext)); + slices.putAll(readLargeDiskRanges(largeRanges, memoryContext)); + // Re-order ChunkReaders by their DiskRange offsets as ParquetColumnChunkIterator expects // the input slices to be in the order that they're present in the file slices.orderValuesBy(comparingLong(ChunkReader::getDiskOffset)); return slices.build(); } - private ListMultimap readSmallDiskRanges(ListMultimap diskRanges) + private List splitLargeRange(DiskRange range) + { + int maxBufferSizeBytes = toIntExact(options.getMaxBufferSize().toBytes()); + checkArgument(maxBufferSizeBytes > 0, "maxBufferSize must by larger than zero but is %s bytes", maxBufferSizeBytes); + ImmutableList.Builder ranges = ImmutableList.builder(); + long endOffset = range.getOffset() + range.getLength(); + long offset = range.getOffset(); + while (offset + maxBufferSizeBytes < endOffset) { + ranges.add(new DiskRange(offset, maxBufferSizeBytes)); + offset += maxBufferSizeBytes; + } + + long lengthLeft = endOffset - offset; + if (lengthLeft > 0) { + ranges.add(new DiskRange(offset, toIntExact(lengthLeft))); + } + + return ranges.build(); + } + + private ListMultimap readSmallDiskRanges(ListMultimap diskRanges, AggregatedMemoryContext memoryContext) { if (diskRanges.isEmpty()) { return ImmutableListMultimap.of(); @@ -164,7 +187,7 @@ private ListMultimap readSmallDiskRanges(ListMultimap slices = ImmutableListMultimap.builder(); for (DiskRange mergedRange : mergedRanges) { - ReferenceCountedReader mergedRangeLoader = new ReferenceCountedReader(mergedRange); + ReferenceCountedReader mergedRangeLoader = new ReferenceCountedReader(mergedRange, memoryContext); for (Map.Entry diskRangeEntry : diskRanges.entries()) { DiskRange diskRange = diskRangeEntry.getValue(); @@ -204,7 +227,7 @@ public void free() return sliceStreams; } - private ListMultimap readLargeDiskRanges(ListMultimap diskRanges) + private ListMultimap readLargeDiskRanges(ListMultimap diskRanges, AggregatedMemoryContext memoryContext) { if (diskRanges.isEmpty()) { return ImmutableListMultimap.of(); @@ -212,7 +235,7 @@ private ListMultimap readLargeDiskRanges(ListMultimap slices = ImmutableListMultimap.builder(); for (Map.Entry entry : diskRanges.entries()) { - slices.put(entry.getKey(), new ReferenceCountedReader(entry.getValue())); + slices.put(entry.getKey(), new ReferenceCountedReader(entry.getValue(), memoryContext)); } return slices.build(); } @@ -256,12 +279,14 @@ private class ReferenceCountedReader implements ChunkReader { private final DiskRange range; + private final LocalMemoryContext readerMemoryUsage; private Slice data; private int referenceCount = 1; - public ReferenceCountedReader(DiskRange range) + public ReferenceCountedReader(DiskRange range, AggregatedMemoryContext memoryContext) { this.range = range; + this.readerMemoryUsage = memoryContext.newLocalMemoryContext(ReferenceCountedReader.class.getSimpleName()); } public void addReference() @@ -286,6 +311,7 @@ public Slice read() byte[] buffer = new byte[range.getLength()]; readFully(range.getOffset(), buffer, 0, buffer.length); data = Slices.wrappedBuffer(buffer); + readerMemoryUsage.setBytes(data.length()); } return data; @@ -299,6 +325,7 @@ public void free() referenceCount--; if (referenceCount == 0) { data = null; + readerMemoryUsage.setBytes(0); } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetDataSource.java b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetDataSource.java index 4fe7ed74ec7b..cfa55200525e 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetDataSource.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetDataSource.java @@ -15,6 +15,7 @@ import com.google.common.collect.ListMultimap; import io.airlift.slice.Slice; +import io.trino.memory.context.AggregatedMemoryContext; import java.io.Closeable; import java.io.IOException; @@ -36,7 +37,7 @@ Slice readTail(int length) Slice readFully(long position, int length) throws IOException; - ListMultimap planRead(ListMultimap diskRanges); + ListMultimap planRead(ListMultimap diskRanges, AggregatedMemoryContext memoryContext); @Override default void close() diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ChunkedInputStream.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ChunkedInputStream.java new file mode 100644 index 000000000000..998f330844ce --- /dev/null +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ChunkedInputStream.java @@ -0,0 +1,151 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader; + +import io.airlift.slice.BasicSliceInput; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.parquet.ChunkReader; +import io.trino.parquet.ParquetReaderOptions; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Iterator; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkPositionIndexes; +import static com.google.common.io.ByteStreams.readFully; +import static java.util.Objects.requireNonNull; + +/** + * A single continuous {@link InputStream} over multiple {@link Slice}s read on demand using given collection of {@link ChunkReader}s. + * It is used to read parquet column chunk in limited (small) byte chunks (8MB by default, controlled by {@link ParquetReaderOptions#getMaxReadBlockSize()}). + * Column chunks consists of multiple pages. + * This abstraction is used because the page size is unknown until the page header is read + * and page header and page data can be split between two or more byte chunks. + */ +public final class ChunkedInputStream + extends InputStream +{ + private final Iterator chunks; + private ChunkReader currentChunkReader; + private BasicSliceInput current; + + public ChunkedInputStream(Collection chunks) + { + requireNonNull(chunks, "chunks is null"); + checkArgument(!chunks.isEmpty(), "At least one chunk is expected but got none"); + this.chunks = chunks.iterator(); + readNextChunk(); + } + + public Slice getSlice(int length) + throws IOException + { + if (length == 0) { + return Slices.EMPTY_SLICE; + } + ensureOpen(); + while (!current.isReadable()) { + checkArgument(chunks.hasNext(), "Requested %s bytes but 0 was available", length); + readNextChunk(); + } + if (current.available() >= length) { + return current.readSlice(length); + } + // requested length crosses the slice boundary + byte[] bytes = new byte[length]; + try { + readFully(this, bytes, 0, bytes.length); + } + catch (IOException e) { + throw new RuntimeException("Failed to read " + length + " bytes", e); + } + return Slices.wrappedBuffer(bytes); + } + + @Override + public int read(byte[] b, int off, int len) + throws IOException + { + checkPositionIndexes(off, off + len, b.length); + if (len == 0) { + return 0; + } + ensureOpen(); + while (!current.isReadable()) { + if (!chunks.hasNext()) { + return -1; + } + readNextChunk(); + } + + return current.read(b, off, len); + } + + @Override + public int read() + throws IOException + { + ensureOpen(); + while (!current.isReadable() && chunks.hasNext()) { + readNextChunk(); + } + + return current.read(); + } + + @Override + public int available() + throws IOException + { + ensureOpen(); + return current.available(); + } + + @Override + public void close() + { + if (current == null) { + // already closed + return; + } + currentChunkReader.free(); + + while (chunks.hasNext()) { + chunks.next().free(); + } + current = null; + } + + private void ensureOpen() + throws IOException + { + if (current == null) { + throw new IOException("Stream closed"); + } + } + + private void readNextChunk() + { + if (currentChunkReader != null) { + currentChunkReader.free(); + } + currentChunkReader = chunks.next(); + Slice slice = currentChunkReader.readUnchecked(); + checkArgument(slice.length() > 0, "all chunks have to be not empty"); + current = slice.getInput(); + } +} diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java index b3a097a447c2..20b8b244642e 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java @@ -13,45 +13,71 @@ */ package io.trino.parquet.reader; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; import io.trino.parquet.DataPage; import io.trino.parquet.DataPageV1; import io.trino.parquet.DataPageV2; import io.trino.parquet.DictionaryPage; +import io.trino.parquet.Page; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; + +import javax.annotation.Nullable; import java.io.IOException; -import java.util.LinkedList; +import java.util.Iterator; +import java.util.Optional; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import static io.trino.parquet.ParquetCompressionUtils.decompress; public final class PageReader { private final CompressionCodecName codec; - private final long valueCount; + private final boolean hasDictionaryPage; private final boolean hasNoNulls; - private final LinkedList compressedPages; - private final DictionaryPage compressedDictionaryPage; - - /** - * @param compressedPages This parameter will be mutated destructively as {@link DataPage} entries are removed as part of {@link #readPage()}. The caller - * should not retain a reference to this list after passing it in as a constructor argument. - */ - public PageReader(CompressionCodecName codec, - LinkedList compressedPages, - DictionaryPage compressedDictionaryPage, - long valueCount, - boolean hasNoNulls) + private final PeekingIterator compressedPages; + + private boolean dictionaryAlreadyRead; + private int dataPageReadCount; + + public static PageReader createPageReader( + ChunkedInputStream columnChunk, + ColumnChunkMetaData metadata, + ColumnDescriptor columnDescriptor, + @Nullable OffsetIndex offsetIndex, + Optional fileCreatedBy) { - this.codec = codec; - this.compressedPages = compressedPages; - this.compressedDictionaryPage = compressedDictionaryPage; - this.valueCount = valueCount; - this.hasNoNulls = hasNoNulls; + // Parquet schema may specify a column definition as OPTIONAL even though there are no nulls in the actual data. + // Row-group column statistics can be used to identify such cases and switch to faster non-nullable read + // paths in FlatColumnReader. + Statistics columnStatistics = metadata.getStatistics(); + boolean hasNoNulls = columnStatistics != null && columnStatistics.getNumNulls() == 0; + ParquetColumnChunkIterator compressedPages = new ParquetColumnChunkIterator( + fileCreatedBy, + new ColumnChunkDescriptor(columnDescriptor, metadata), + columnChunk, + offsetIndex); + return new PageReader(metadata.getCodec(), compressedPages, compressedPages.hasDictionaryPage(), hasNoNulls); } - public long getTotalValueCount() + @VisibleForTesting + public PageReader( + CompressionCodecName codec, + Iterator compressedPages, + boolean hasDictionaryPage, + boolean hasNoNulls) { - return valueCount; + this.codec = codec; + this.compressedPages = Iterators.peekingIterator(compressedPages); + this.hasDictionaryPage = hasDictionaryPage; + this.hasNoNulls = hasNoNulls; } public boolean hasNoNulls() @@ -61,10 +87,14 @@ public boolean hasNoNulls() public DataPage readPage() { - if (compressedPages.isEmpty()) { + if (hasDictionaryPage) { + checkState(dictionaryAlreadyRead, "Dictionary has to be read first"); + } + if (!compressedPages.hasNext()) { return null; } - DataPage compressedPage = compressedPages.removeFirst(); + Page compressedPage = compressedPages.next(); + dataPageReadCount++; try { if (compressedPage instanceof DataPageV1) { DataPageV1 dataPageV1 = (DataPageV1) compressedPage; @@ -104,10 +134,16 @@ public DataPage readPage() public DictionaryPage readDictionaryPage() { - if (compressedDictionaryPage == null) { + if (!hasDictionaryPage) { return null; } try { + checkState(!dictionaryAlreadyRead, "Dictionary was already read"); + checkState(dataPageReadCount == 0, "Dictionary has to be read first but " + dataPageReadCount + " was read already"); + dictionaryAlreadyRead = true; + Page firstPage = compressedPages.next(); + checkArgument(firstPage instanceof DictionaryPage, "DictionaryPage has to be the first page in the column chunk but got %s", firstPage); + DictionaryPage compressedDictionaryPage = (DictionaryPage) firstPage; return new DictionaryPage( decompress(codec, compressedDictionaryPage.getSlice(), compressedDictionaryPage.getUncompressedSize()), compressedDictionaryPage.getDictionarySize(), @@ -118,18 +154,28 @@ public DictionaryPage readDictionaryPage() } } - public DataPage getNextPage() + public boolean hasNext() { - return compressedPages.getFirst(); + return compressedPages.hasNext(); } - public boolean hasNext() + public DataPage getNextPage() { - return !compressedPages.isEmpty(); + verifyDictionaryPageRead(); + + return (DataPage) compressedPages.peek(); } public void skipNextPage() { - compressedPages.removeFirst(); + verifyDictionaryPageRead(); + compressedPages.next(); + } + + private void verifyDictionaryPageRead() + { + if (hasDictionaryPage) { + checkArgument(dictionaryAlreadyRead, "Dictionary has to be read first"); + } } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunk.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java similarity index 60% rename from lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunk.java rename to lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java index f34f9a656286..99eb21234f0c 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunk.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java @@ -13,15 +13,12 @@ */ package io.trino.parquet.reader; -import io.airlift.slice.BasicSliceInput; -import io.airlift.slice.Slice; -import io.trino.parquet.DataPage; import io.trino.parquet.DataPageV1; import io.trino.parquet.DataPageV2; import io.trino.parquet.DictionaryPage; +import io.trino.parquet.Page; import io.trino.parquet.ParquetCorruptionException; import org.apache.parquet.column.Encoding; -import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.format.DataPageHeader; import org.apache.parquet.format.DataPageHeaderV2; import org.apache.parquet.format.DictionaryPageHeader; @@ -29,96 +26,93 @@ import org.apache.parquet.format.Util; import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import javax.annotation.Nullable; + import java.io.IOException; -import java.util.LinkedList; -import java.util.List; +import java.util.Iterator; import java.util.Optional; import java.util.OptionalLong; -import static com.google.common.base.Verify.verify; +import static com.google.common.base.Preconditions.checkArgument; import static io.trino.parquet.ParquetTypeUtils.getParquetEncoding; import static java.util.Objects.requireNonNull; -public class ParquetColumnChunk +public final class ParquetColumnChunkIterator + implements Iterator { private final Optional fileCreatedBy; private final ColumnChunkDescriptor descriptor; - private final List slices; - private int sliceIndex; - private BasicSliceInput input; + private final ChunkedInputStream input; private final OffsetIndex offsetIndex; - public ParquetColumnChunk( + private long valueCount; + private int dataPageCount; + private boolean dictionaryWasRead; + + public ParquetColumnChunkIterator( Optional fileCreatedBy, ColumnChunkDescriptor descriptor, - List slices, - OffsetIndex offsetIndex) + ChunkedInputStream input, + @Nullable OffsetIndex offsetIndex) { this.fileCreatedBy = requireNonNull(fileCreatedBy, "fileCreatedBy is null"); - this.descriptor = descriptor; - this.slices = slices; - this.sliceIndex = 0; - this.input = slices.get(0).getInput(); + this.descriptor = requireNonNull(descriptor, "descriptor is null"); + this.input = requireNonNull(input, "input is null"); this.offsetIndex = offsetIndex; } - private void advanceIfNecessary() + public boolean hasDictionaryPage() { - if (input.available() == 0) { - sliceIndex++; - if (sliceIndex < slices.size()) { - input = slices.get(sliceIndex).getInput(); - } - } + return descriptor.getColumnChunkMetaData().hasDictionaryPage(); } - protected PageHeader readPageHeader() - throws IOException + @Override + public boolean hasNext() { - verify(input.available() > 0, "Reached end of input unexpectedly"); - PageHeader pageHeader = Util.readPageHeader(input); - advanceIfNecessary(); - return pageHeader; + return hasMorePages(valueCount, dataPageCount) || (hasDictionaryPage() && !dictionaryWasRead); } - public PageReader readAllPages() - throws IOException + @Override + public Page next() { - LinkedList pages = new LinkedList<>(); - DictionaryPage dictionaryPage = null; - long valueCount = 0; - int dataPageCount = 0; - while (hasMorePages(valueCount, dataPageCount)) { + checkArgument(hasNext()); + + try { PageHeader pageHeader = readPageHeader(); int uncompressedPageSize = pageHeader.getUncompressed_page_size(); int compressedPageSize = pageHeader.getCompressed_page_size(); + Page result = null; switch (pageHeader.type) { case DICTIONARY_PAGE: - if (dictionaryPage != null) { - throw new ParquetCorruptionException("%s has more than one dictionary page in column chunk", descriptor.getColumnDescriptor()); + if (dataPageCount != 0) { + throw new ParquetCorruptionException("%s has dictionary page at not first position in column chunk", descriptor.getColumnDescriptor()); } - dictionaryPage = readDictionaryPage(pageHeader, uncompressedPageSize, compressedPageSize); + result = readDictionaryPage(pageHeader, pageHeader.getUncompressed_page_size(), pageHeader.getCompressed_page_size()); + dictionaryWasRead = true; break; case DATA_PAGE: - valueCount += readDataPageV1(pageHeader, uncompressedPageSize, compressedPageSize, pages, getFirstRowIndex(dataPageCount, offsetIndex)); + result = readDataPageV1(pageHeader, uncompressedPageSize, compressedPageSize, getFirstRowIndex(dataPageCount, offsetIndex)); ++dataPageCount; break; case DATA_PAGE_V2: - valueCount += readDataPageV2(pageHeader, uncompressedPageSize, compressedPageSize, pages, getFirstRowIndex(dataPageCount, offsetIndex)); + result = readDataPageV2(pageHeader, uncompressedPageSize, compressedPageSize, getFirstRowIndex(dataPageCount, offsetIndex)); ++dataPageCount; break; default: input.skip(compressedPageSize); - advanceIfNecessary(); break; } + return result; } - // Parquet schema may specify a column definition as OPTIONAL even though there are no nulls in the actual data. - // Row-group column statistics can be used to identify such cases and switch to faster non-nullable read - // paths in FlatColumnReader. - Statistics columnStatistics = descriptor.getColumnChunkMetaData().getStatistics(); - boolean hasNoNulls = columnStatistics != null && columnStatistics.getNumNulls() == 0; - return new PageReader(descriptor.getColumnChunkMetaData().getCodec(), pages, dictionaryPage, valueCount, hasNoNulls); + catch (IOException e) { + throw new RuntimeException(e); + } + } + + private PageHeader readPageHeader() + throws IOException + { + return Util.readPageHeader(input); } private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar) @@ -129,67 +123,61 @@ private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoF return dataPageCountReadSoFar < offsetIndex.getPageCount(); } - private Slice getSlice(int size) - { - Slice slice = input.readSlice(size); - advanceIfNecessary(); - return slice; - } - private DictionaryPage readDictionaryPage(PageHeader pageHeader, int uncompressedPageSize, int compressedPageSize) + throws IOException { DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header(); return new DictionaryPage( - getSlice(compressedPageSize), + input.getSlice(compressedPageSize), uncompressedPageSize, dicHeader.getNum_values(), getParquetEncoding(Encoding.valueOf(dicHeader.getEncoding().name()))); } - private long readDataPageV1( + private DataPageV1 readDataPageV1( PageHeader pageHeader, int uncompressedPageSize, int compressedPageSize, - List pages, OptionalLong firstRowIndex) + throws IOException { DataPageHeader dataHeaderV1 = pageHeader.getData_page_header(); - pages.add(new DataPageV1( - getSlice(compressedPageSize), + valueCount += dataHeaderV1.getNum_values(); + return new DataPageV1( + input.getSlice(compressedPageSize), dataHeaderV1.getNum_values(), uncompressedPageSize, firstRowIndex, getParquetEncoding(Encoding.valueOf(dataHeaderV1.getRepetition_level_encoding().name())), getParquetEncoding(Encoding.valueOf(dataHeaderV1.getDefinition_level_encoding().name())), - getParquetEncoding(Encoding.valueOf(dataHeaderV1.getEncoding().name())))); - return dataHeaderV1.getNum_values(); + getParquetEncoding(Encoding.valueOf(dataHeaderV1.getEncoding().name()))); } - private long readDataPageV2( + private DataPageV2 readDataPageV2( PageHeader pageHeader, int uncompressedPageSize, int compressedPageSize, - List pages, OptionalLong firstRowIndex) + throws IOException { DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2(); int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length(); - pages.add(new DataPageV2( + valueCount += dataHeaderV2.getNum_values(); + return new DataPageV2( dataHeaderV2.getNum_rows(), dataHeaderV2.getNum_nulls(), dataHeaderV2.getNum_values(), - getSlice(dataHeaderV2.getRepetition_levels_byte_length()), - getSlice(dataHeaderV2.getDefinition_levels_byte_length()), + input.getSlice(dataHeaderV2.getRepetition_levels_byte_length()), + input.getSlice(dataHeaderV2.getDefinition_levels_byte_length()), getParquetEncoding(Encoding.valueOf(dataHeaderV2.getEncoding().name())), - getSlice(dataSize), + input.getSlice(dataSize), uncompressedPageSize, firstRowIndex, MetadataReader.readStats( fileCreatedBy, Optional.ofNullable(dataHeaderV2.getStatistics()), descriptor.getColumnDescriptor().getPrimitiveType()), - dataHeaderV2.isIs_compressed())); - return dataHeaderV2.getNum_values(); + dataHeaderV2.isIs_compressed()); } private static OptionalLong getFirstRowIndex(int pageIndex, OffsetIndex offsetIndex) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java index 14066ee813aa..a6f0cac9de39 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java @@ -17,12 +17,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ListMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Multimap; -import io.airlift.slice.Slice; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.parquet.ChunkKey; -import io.trino.parquet.ChunkReader; import io.trino.parquet.DiskRange; import io.trino.parquet.Field; import io.trino.parquet.GroupField; @@ -65,7 +61,6 @@ import java.io.Closeable; import java.io.IOException; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -74,6 +69,7 @@ import java.util.function.Function; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.parquet.ParquetValidationUtils.validateParquet; import static io.trino.parquet.ParquetWriteValidation.StatisticsValidation; @@ -81,6 +77,7 @@ import static io.trino.parquet.ParquetWriteValidation.WriteChecksumBuilder; import static io.trino.parquet.ParquetWriteValidation.WriteChecksumBuilder.createWriteChecksumBuilder; import static io.trino.parquet.reader.ListColumnReader.calculateCollectionOffsets; +import static io.trino.parquet.reader.PageReader.createPageReader; import static java.lang.Math.max; import static java.lang.Math.min; import static java.lang.Math.toIntExact; @@ -125,8 +122,7 @@ public class ParquetReader private final ParquetReaderOptions options; private int maxBatchSize = MAX_VECTOR_LENGTH; - private AggregatedMemoryContext currentRowGroupMemoryContext; - private final Multimap chunkReaders; + private final Map chunkReaders; private final List> columnIndexStore; private final Optional writeValidation; private final Optional writeChecksumBuilder; @@ -176,7 +172,6 @@ public ParquetReader( this.dataSource = requireNonNull(dataSource, "dataSource is null"); this.timeZone = requireNonNull(timeZone, "timeZone is null"); this.memoryContext = requireNonNull(memoryContext, "memoryContext is null"); - this.currentRowGroupMemoryContext = memoryContext.newAggregatedMemoryContext(); this.options = requireNonNull(options, "options is null"); this.columnReaders = new HashMap<>(); this.maxBytesPerCell = new HashMap<>(); @@ -241,15 +236,17 @@ public ParquetReader( } } this.codecMetrics = ImmutableMap.copyOf(codecMetrics); - this.chunkReaders = dataSource.planRead(ranges); + this.chunkReaders = dataSource.planRead(ranges, memoryContext).asMap().entrySet().stream() + .collect(toImmutableMap(Map.Entry::getKey, entry -> new ChunkedInputStream(entry.getValue()))); } @Override public void close() throws IOException { - freeCurrentRowGroupBuffers(); - currentRowGroupMemoryContext.close(); + for (ChunkedInputStream chunkedInputStream : chunkReaders.values()) { + chunkedInputStream.close(); + } dataSource.close(); if (writeChecksumBuilder.isPresent()) { @@ -304,8 +301,6 @@ private int nextBatch() private boolean advanceToNextRowGroup() throws IOException { - currentRowGroupMemoryContext.close(); - currentRowGroupMemoryContext = memoryContext.newAggregatedMemoryContext(); freeCurrentRowGroupBuffers(); if (currentRowGroup >= 0 && rowGroupStatisticsValidation.isPresent()) { @@ -342,11 +337,9 @@ private void freeCurrentRowGroupBuffers() } for (int column = 0; column < primitiveFields.size(); column++) { - Collection readers = chunkReaders.get(new ChunkKey(column, currentRowGroup)); - if (readers != null) { - for (ChunkReader reader : readers) { - reader.free(); - } + ChunkedInputStream chunkedStream = chunkReaders.get(new ChunkKey(column, currentRowGroup)); + if (chunkedStream != null) { + chunkedStream.close(); } } } @@ -437,8 +430,10 @@ private ColumnChunk readPrimitive(PrimitiveField field) if (rowRanges != null) { offsetIndex = getFilteredOffsetIndex(rowRanges, currentRowGroup, currentBlockMetadata.getRowCount(), metadata.getPath()); } - List slices = allocateBlock(fieldId); - columnReader.setPageReader(createPageReader(slices, metadata, columnDescriptor, offsetIndex), Optional.ofNullable(rowRanges)); + ChunkedInputStream columnChunkInputStream = chunkReaders.get(new ChunkKey(fieldId, currentRowGroup)); + columnReader.setPageReader( + createPageReader(columnChunkInputStream, metadata, columnDescriptor, offsetIndex, fileCreatedBy), + Optional.ofNullable(rowRanges)); } ColumnChunk columnChunk = columnReader.readPrimitive(); @@ -465,29 +460,6 @@ public Metrics getMetrics() return new Metrics(metrics.buildOrThrow()); } - private PageReader createPageReader(List slices, ColumnChunkMetaData metadata, ColumnDescriptor columnDescriptor, OffsetIndex offsetIndex) - throws IOException - { - ColumnChunkDescriptor descriptor = new ColumnChunkDescriptor(columnDescriptor, metadata); - ParquetColumnChunk columnChunk = new ParquetColumnChunk(fileCreatedBy, descriptor, slices, offsetIndex); - return columnChunk.readAllPages(); - } - - private List allocateBlock(int fieldId) - throws IOException - { - Collection readers = chunkReaders.get(new ChunkKey(fieldId, currentRowGroup)); - List slices = Lists.newArrayListWithExpectedSize(readers.size()); - for (ChunkReader reader : readers) { - Slice slice = reader.read(); - // todo this just an estimate and doesn't reflect actual retained memory - currentRowGroupMemoryContext.newLocalMemoryContext(ParquetReader.class.getSimpleName()) - .setBytes(slice.length()); - slices.add(slice); - } - return slices; - } - private ColumnChunkMetaData getColumnChunkMetaData(BlockMetaData blockMetaData, ColumnDescriptor columnDescriptor) throws IOException { diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PrimitiveColumnReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PrimitiveColumnReader.java index 264a3d377268..1dc845b9116b 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PrimitiveColumnReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PrimitiveColumnReader.java @@ -60,10 +60,8 @@ public abstract class PrimitiveColumnReader private int nextBatchSize; private LevelReader repetitionReader; private LevelReader definitionReader; - private long totalValueCount; private PageReader pageReader; private Dictionary dictionary; - private int currentValueCount; private DataPage page; private int remainingValueCountInPage; private int readOffset; @@ -117,8 +115,6 @@ public void setPageReader(PageReader pageReader, Optional row else { dictionary = null; } - checkArgument(pageReader.getTotalValueCount() > 0, "page is empty"); - totalValueCount = pageReader.getTotalValueCount(); if (rowRanges.isPresent()) { indexIterator = rowRanges.get().getParquetRowRanges().iterator(); // If rowRanges is empty for a row-group, then no page needs to be read, and we should not reach here @@ -255,7 +251,6 @@ private void processValues(long valuesToRead, Runnable valueReader) private void seek() { - checkArgument(currentValueCount <= totalValueCount, "Already read all values in column chunk"); if (readOffset == 0) { return; } @@ -300,7 +295,6 @@ private void updateValueCounts(int valuesRead, int skipCount) valuesReader = null; } remainingValueCountInPage -= totalCount; - currentValueCount += valuesRead; } private ValuesReader readPageV1(DataPageV1 page) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java index cfbb2e016790..783310446b30 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java @@ -17,7 +17,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ListMultimap; import com.google.common.collect.Multimap; -import io.airlift.slice.Slice; import io.trino.parquet.ChunkReader; import io.trino.parquet.DiskRange; import io.trino.parquet.ParquetDataSource; @@ -35,13 +34,14 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.io.InputStream; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.BiFunction; import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static java.util.Objects.requireNonNull; /** @@ -102,9 +102,9 @@ public TrinoColumnIndexStore( public ColumnIndex getColumnIndex(ColumnPath column) { if (columnIndexStore == null) { - columnIndexStore = loadIndexes(dataSource, columnIndexReferences, (buffer, columnMetadata) -> { + columnIndexStore = loadIndexes(dataSource, columnIndexReferences, (inputStream, columnMetadata) -> { try { - return ParquetMetadataConverter.fromParquetColumnIndex(columnMetadata.getPrimitiveType(), Util.readColumnIndex(buffer.getInput())); + return ParquetMetadataConverter.fromParquetColumnIndex(columnMetadata.getPrimitiveType(), Util.readColumnIndex(inputStream)); } catch (IOException e) { throw new RuntimeException(e); @@ -119,9 +119,9 @@ public ColumnIndex getColumnIndex(ColumnPath column) public OffsetIndex getOffsetIndex(ColumnPath column) { if (offsetIndexStore == null) { - offsetIndexStore = loadIndexes(dataSource, offsetIndexReferences, (buffer, columnMetadata) -> { + offsetIndexStore = loadIndexes(dataSource, offsetIndexReferences, (inputStream, columnMetadata) -> { try { - return ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(buffer.getInput())); + return ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(inputStream)); } catch (IOException e) { throw new RuntimeException(e); @@ -135,7 +135,7 @@ public OffsetIndex getOffsetIndex(ColumnPath column) private static Map loadIndexes( ParquetDataSource dataSource, List indexMetadata, - BiFunction deserializer) + BiFunction deserializer) { // Merge multiple small reads of the file for indexes stored close to each other ListMultimap ranges = ArrayListMultimap.create(indexMetadata.size(), 1); @@ -143,15 +143,17 @@ private static Map loadIndexes( ranges.put(column.getPath(), column.getDiskRange()); } - Multimap chunkReaders = dataSource.planRead(ranges); + Multimap chunkReaders = dataSource.planRead(ranges, newSimpleAggregatedMemoryContext()); + Map columnInputStreams = chunkReaders.asMap().entrySet().stream() + .collect(toImmutableMap(Map.Entry::getKey, entry -> new ChunkedInputStream(entry.getValue()))); try { return indexMetadata.stream() .collect(toImmutableMap( ColumnIndexMetadata::getPath, - column -> deserializer.apply(getOnlyElement(chunkReaders.get(column.getPath())).readUnchecked(), column))); + column -> deserializer.apply(columnInputStreams.get(column.getPath()), column))); } finally { - chunkReaders.values().forEach(ChunkReader::free); + columnInputStreams.values().forEach(ChunkedInputStream::close); } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/FlatColumnReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/FlatColumnReader.java index 6989d3161f81..27e646de74b2 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/FlatColumnReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/FlatColumnReader.java @@ -340,7 +340,6 @@ public void setPageReader(PageReader pageReader, Optional row plainValuesDecoder.read(dictionary, 0, size); dictionaryDecoder = new DictionaryDecoder<>(dictionary, columnAdapter); } - checkArgument(pageReader.getTotalValueCount() > 0, "page is empty"); this.rowRanges = createRowRangesIterator(rowRanges); } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderBenchmark.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderBenchmark.java index 0f98d2d50e69..51684e0af3b2 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderBenchmark.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderBenchmark.java @@ -102,7 +102,7 @@ public int read() throws IOException { ColumnReader columnReader = ColumnReaderFactory.create(field, UTC, true); - columnReader.setPageReader(new PageReader(UNCOMPRESSED, new LinkedList<>(dataPages), null, dataPositions, false), Optional.empty()); + columnReader.setPageReader(new PageReader(UNCOMPRESSED, new LinkedList<>(dataPages).iterator(), false, false), Optional.empty()); int rowsRead = 0; while (rowsRead < dataPositions) { int remaining = dataPositions - rowsRead; diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestChunkedInputStream.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestChunkedInputStream.java new file mode 100644 index 000000000000..b22a3e7c8209 --- /dev/null +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestChunkedInputStream.java @@ -0,0 +1,153 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader; + +import com.google.common.collect.ImmutableList; +import com.google.common.io.ByteStreams; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.airlift.slice.Slices.EMPTY_SLICE; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class TestChunkedInputStream +{ + @Test + public void empty() + throws IOException + { + assertThatThrownBy(() -> input(ImmutableList.of())).isInstanceOf(IllegalArgumentException.class); + } + + @Test(dataProvider = "chunks") + public void testInput(List chunks) + throws IOException + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + for (byte[] chunk : chunks) { + out.write(chunk); + } + byte[] expectedBytes = out.toByteArray(); + byte[] buffer = new byte[expectedBytes.length + 1]; + + List slices = chunks.stream().map(Slices::wrappedBuffer).collect(toImmutableList()); + assertEquals(input(slices).readAllBytes(), expectedBytes); + assertEquals(input(slices).getSlice(expectedBytes.length).getBytes(), expectedBytes); + assertEquals(readAll(input(slices)), expectedBytes); + + assertEquals(input(slices).readNBytes(expectedBytes.length), expectedBytes); + + assertEquals(input(slices).read(buffer, 0, 0), 0); + assertEquals(input(slices).getSlice(0), EMPTY_SLICE); + + if (expectedBytes.length > 0) { + // read from one chunk only + assertEquals(input(slices).read(buffer, 0, 1), 1); + assertEquals(buffer[0], expectedBytes[0]); + } + + // rad more than total length + ChunkedInputStream input = input(slices); + int bytesRead = ByteStreams.read(input, buffer, 0, buffer.length); + assertEquals(bytesRead, expectedBytes.length > 0 ? expectedBytes.length : -1); + + // read after input is done returns -1 + assertEquals(input.read(), -1); + // getSlice(0) after input is done returns empty slice + assertEquals(input.getSlice(0), EMPTY_SLICE); + assertThatThrownBy(() -> input.getSlice(1)).isInstanceOf(IllegalArgumentException.class); + + assertEquals(input.read(buffer, 0, 1), -1); + + // verify available + ChunkedInputStream availableInput = input(slices); + assertEquals(availableInput.available(), chunks.get(0).length); + availableInput.skipNBytes(chunks.get(0).length); + assertEquals(availableInput.available(), 0); + if (chunks.size() > 1) { + availableInput.read(); + assertEquals(availableInput.available(), chunks.get(1).length - 1); + availableInput.skipNBytes(chunks.get(1).length - 1); + assertEquals(availableInput.available(), 0); + } + } + + @Test(dataProvider = "chunks") + public void testClose(List chunks) + throws IOException + { + List slices = chunks.stream().map(Slices::wrappedBuffer).collect(toImmutableList()); + + // close fresh input, not read + List chunksReaders = slices.stream().map(TestingChunkReader::new).collect(toImmutableList()); + ChunkedInputStream input = new ChunkedInputStream(chunksReaders); + input.close(); + for (TestingChunkReader chunksReader : chunksReaders) { + assertTrue(chunksReader.isFreed()); + } + + // close partially read input + chunksReaders = slices.stream().map(TestingChunkReader::new).collect(toImmutableList()); + input = new ChunkedInputStream(chunksReaders); + input.readNBytes(chunks.get(0).length); + input.close(); + for (TestingChunkReader chunksReader : chunksReaders) { + assertTrue(chunksReader.isFreed()); + } + + // close fully read input + chunksReaders = slices.stream().map(TestingChunkReader::new).collect(toImmutableList()); + input = new ChunkedInputStream(chunksReaders); + input.readNBytes(chunks.stream().mapToInt(chunk -> chunk.length).sum()); + input.close(); + for (TestingChunkReader chunksReader : chunksReaders) { + assertTrue(chunksReader.isFreed()); + } + } + + @DataProvider + public Object[][] chunks() + { + return new Object[][] { + {ImmutableList.of(new byte[] {1, 2, 3})}, + {ImmutableList.of(new byte[] {1, 2, 3}, new byte[] {1, 2})}, + {ImmutableList.of(new byte[] {1, 2, 3}, new byte[] {1}, new byte[] {1, 2})}, + }; + } + + private static byte[] readAll(ChunkedInputStream input) + throws IOException + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + int read; + while ((read = input.read()) != -1) { + out.write(read); + } + return out.toByteArray(); + } + + private static ChunkedInputStream input(List slices) + { + return new ChunkedInputStream(slices.stream().map(TestingChunkReader::new).collect(toImmutableList())); + } +} diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReader.java index 530749eb3dd2..fe8fabfe38cf 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReader.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReader.java @@ -17,7 +17,7 @@ import io.airlift.slice.Slices; import io.trino.parquet.DataPage; import io.trino.parquet.DataPageV2; -import io.trino.parquet.DictionaryPage; +import io.trino.parquet.Page; import io.trino.parquet.PrimitiveField; import io.trino.parquet.reader.decoders.ValueDecoders; import io.trino.parquet.reader.flat.FlatColumnReader; @@ -38,7 +38,6 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.LinkedList; import java.util.List; import java.util.Optional; import java.util.OptionalLong; @@ -350,16 +349,14 @@ private static PageReader getPageReader(List testingPages, boolean else { encoder = new PlainValuesWriter(1000, 1000, HeapByteBufferAllocator.getInstance()); } - LinkedList inputPages = new LinkedList<>(createDataPages(testingPages, encoder)); - DictionaryPage dictionaryPage = null; + List inputPages = createDataPages(testingPages, encoder); if (dictionaryEncoded) { - dictionaryPage = toTrinoDictionaryPage(encoder.toDictPageAndClose()); + inputPages = ImmutableList.builder().add(toTrinoDictionaryPage(encoder.toDictPageAndClose())).addAll(inputPages).build(); } return new PageReader( UNCOMPRESSED, - inputPages, - dictionaryPage, - pagesRowCount(testingPages), + inputPages.iterator(), + dictionaryEncoded, false); } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java index ac6489f6a1d8..053bf128feff 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java @@ -33,7 +33,6 @@ import java.io.IOException; import java.time.LocalDateTime; -import java.util.LinkedList; import java.util.List; import java.util.Optional; import java.util.OptionalLong; @@ -108,7 +107,7 @@ public void testVariousTimestamps(TimestampType type, BiFunction(List.of(dataPage)), null, dataPage.getValueCount(), false), + new PageReader(UNCOMPRESSED, List.of(dataPage).iterator(), false, false), Optional.empty()); reader.prepareNextRead(valueCount); Block block = reader.readPrimitive().getBlock(); diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestPageReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestPageReader.java new file mode 100644 index 000000000000..02097aa26ba6 --- /dev/null +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestPageReader.java @@ -0,0 +1,431 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.airlift.compress.snappy.SnappyCompressor; +import io.airlift.compress.snappy.SnappyRawCompressor; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.parquet.DataPage; +import io.trino.parquet.DataPageV1; +import io.trino.parquet.DataPageV2; +import io.trino.parquet.DictionaryPage; +import io.trino.parquet.ParquetCorruptionException; +import io.trino.parquet.ParquetEncoding; +import io.trino.parquet.ParquetTypeUtils; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.EncodingStats; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.Encoding; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.PageType; +import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.parquet.reader.TestPageReader.DataPageType.V1; +import static io.trino.parquet.reader.TestPageReader.DataPageType.V2; +import static java.util.Objects.requireNonNull; +import static org.apache.parquet.column.Encoding.PLAIN; +import static org.apache.parquet.column.Encoding.RLE_DICTIONARY; +import static org.apache.parquet.format.PageType.DATA_PAGE_V2; +import static org.apache.parquet.format.PageType.DICTIONARY_PAGE; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +public class TestPageReader +{ + private static final byte[] DATA_PAGE = new byte[] {1, 2, 3}; + + @Test(dataProvider = "pageParameters") + public void singlePage(CompressionCodecName compressionCodec, DataPageType dataPageType) + throws Exception + { + int valueCount = 10; + byte[] compressedDataPage = dataPageType.compress(compressionCodec, DATA_PAGE); + + PageHeader pageHeader = new PageHeader(dataPageType.pageType(), DATA_PAGE.length, compressedDataPage.length); + dataPageType.setDataPageHeader(pageHeader, valueCount); + + ByteArrayOutputStream out = new ByteArrayOutputStream(20); + Util.writePageHeader(pageHeader, out); + int headerSize = out.size(); + out.write(compressedDataPage); + byte[] bytes = out.toByteArray(); + + // single slice + assertSinglePage(compressionCodec, valueCount, pageHeader, compressedDataPage, ImmutableList.of( + Slices.wrappedBuffer(bytes))); + + // pageHeader split across two slices + assertSinglePage(compressionCodec, valueCount, pageHeader, compressedDataPage, ImmutableList.of( + Slices.wrappedBuffer(Arrays.copyOf(bytes, 8)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, 8, bytes.length)))); + + // pageHeader split across many slices + int secondHeaderChunkOffset = 5; + int thirdHeaderChunkOffset = 11; + assertSinglePage(compressionCodec, valueCount, pageHeader, compressedDataPage, ImmutableList.of( + Slices.wrappedBuffer(Arrays.copyOf(bytes, secondHeaderChunkOffset)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, secondHeaderChunkOffset, thirdHeaderChunkOffset)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, thirdHeaderChunkOffset, bytes.length)))); + + // page data split across two slices + assertSinglePage(compressionCodec, valueCount, pageHeader, compressedDataPage, ImmutableList.of( + Slices.wrappedBuffer(Arrays.copyOf(bytes, headerSize + 1)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, headerSize + 1, bytes.length)))); + + // page data split across many slices + assertSinglePage(compressionCodec, valueCount, pageHeader, compressedDataPage, ImmutableList.of( + Slices.wrappedBuffer(Arrays.copyOf(bytes, headerSize + 1)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, headerSize + 1, headerSize + 2)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, headerSize + 2, bytes.length)))); + } + + @Test(dataProvider = "pageParameters") + public void manyPages(CompressionCodecName compressionCodec, DataPageType dataPageType) + throws Exception + { + int totalValueCount = 30; + byte[] compressedDataPage = dataPageType.compress(compressionCodec, DATA_PAGE); + + PageHeader pageHeader = new PageHeader(dataPageType.pageType(), DATA_PAGE.length, compressedDataPage.length); + dataPageType.setDataPageHeader(pageHeader, 10); + + ByteArrayOutputStream out = new ByteArrayOutputStream(100); + Util.writePageHeader(pageHeader, out); + int headerSize = out.size(); + out.write(compressedDataPage); + // second page + Util.writePageHeader(pageHeader, out); + out.write(compressedDataPage); + // third page + Util.writePageHeader(pageHeader, out); + out.write(compressedDataPage); + byte[] bytes = out.toByteArray(); + + // single slice + assertPages(compressionCodec, totalValueCount, 3, pageHeader, compressedDataPage, ImmutableList.of( + Slices.wrappedBuffer(bytes))); + + // each page in its own split + int pageSize = headerSize + compressedDataPage.length; + assertPages(compressionCodec, totalValueCount, 3, pageHeader, compressedDataPage, ImmutableList.of( + Slices.wrappedBuffer(Arrays.copyOf(bytes, pageSize)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, pageSize, pageSize * 2)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, pageSize * 2, bytes.length)))); + + // page start in the middle of the slice + assertPages(compressionCodec, totalValueCount, 3, pageHeader, compressedDataPage, ImmutableList.of( + Slices.wrappedBuffer(Arrays.copyOf(bytes, pageSize - 2)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, pageSize - 2, pageSize * 2)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, pageSize * 2, bytes.length)))); + } + + @Test(dataProvider = "pageParameters") + public void dictionaryPage(CompressionCodecName compressionCodec, DataPageType dataPageType) + throws Exception + { + byte[] dictionaryPage = {4}; + byte[] compressedDictionaryPage = TestPageReader.compress(compressionCodec, dictionaryPage, 0, dictionaryPage.length); + PageHeader dictionaryPageHeader = new PageHeader(DICTIONARY_PAGE, dictionaryPage.length, compressedDictionaryPage.length); + dictionaryPageHeader.setDictionary_page_header(new DictionaryPageHeader(3, Encoding.PLAIN)); + int totalValueCount = 30; + byte[] compressedDataPage = dataPageType.compress(compressionCodec, DATA_PAGE); + + PageHeader pageHeader = new PageHeader(dataPageType.pageType(), DATA_PAGE.length, compressedDataPage.length); + dataPageType.setDataPageHeader(pageHeader, 10); + + ByteArrayOutputStream out = new ByteArrayOutputStream(100); + Util.writePageHeader(dictionaryPageHeader, out); + int dictionaryHeaderSize = out.size(); + out.write(compressedDictionaryPage); + int dictionaryPageSize = out.size(); + + Util.writePageHeader(pageHeader, out); + out.write(compressedDataPage); + // second page + Util.writePageHeader(pageHeader, out); + out.write(compressedDataPage); + // third page + Util.writePageHeader(pageHeader, out); + out.write(compressedDataPage); + byte[] bytes = out.toByteArray(); + + PageReader pageReader = createPageReader(totalValueCount, compressionCodec, true, ImmutableList.of(Slices.wrappedBuffer(bytes))); + DictionaryPage uncompressedDictionaryPage = pageReader.readDictionaryPage(); + assertThat(uncompressedDictionaryPage.getDictionarySize()).isEqualTo(dictionaryPageHeader.getDictionary_page_header().getNum_values()); + assertEncodingEquals(uncompressedDictionaryPage.getEncoding(), dictionaryPageHeader.getDictionary_page_header().getEncoding()); + assertThat(uncompressedDictionaryPage.getSlice()).isEqualTo(Slices.wrappedBuffer(dictionaryPage)); + + // single slice + assertPages(compressionCodec, totalValueCount, 3, pageHeader, compressedDataPage, true, ImmutableList.of(Slices.wrappedBuffer(bytes))); + + // only dictionary + assertPages(compressionCodec, 0, 0, pageHeader, compressedDataPage, true, ImmutableList.of( + Slices.wrappedBuffer(Arrays.copyOf(bytes, dictionaryPageSize)))); + + // multiple slices dictionary + assertPages(compressionCodec, totalValueCount, 3, pageHeader, compressedDataPage, true, ImmutableList.of( + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, 0, dictionaryHeaderSize - 1)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, dictionaryHeaderSize - 1, dictionaryPageSize - 1)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, dictionaryPageSize - 1, dictionaryPageSize + 1)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, dictionaryPageSize + 1, bytes.length)))); + } + + @Test + public void dictionaryPageNotFirst() + throws Exception + { + byte[] dictionaryPage = {4}; + CompressionCodecName compressionCodec = UNCOMPRESSED; + byte[] compressedDictionaryPage = TestPageReader.compress(compressionCodec, dictionaryPage, 0, dictionaryPage.length); + PageHeader dictionaryPageHeader = new PageHeader(DICTIONARY_PAGE, dictionaryPage.length, compressedDictionaryPage.length); + dictionaryPageHeader.setDictionary_page_header(new DictionaryPageHeader(3, Encoding.PLAIN)); + DataPageType dataPageType = V2; + byte[] compressedDataPage = DATA_PAGE; + + PageHeader pageHeader = new PageHeader(dataPageType.pageType(), DATA_PAGE.length, compressedDataPage.length); + int valueCount = 10; + dataPageType.setDataPageHeader(pageHeader, valueCount); + + ByteArrayOutputStream out = new ByteArrayOutputStream(100); + Util.writePageHeader(pageHeader, out); + out.write(compressedDataPage); + + Util.writePageHeader(dictionaryPageHeader, out); + out.write(compressedDictionaryPage); + // write another page so that we have something to read after the first + Util.writePageHeader(pageHeader, out); + out.write(compressedDataPage); + byte[] bytes = out.toByteArray(); + + int totalValueCount = valueCount * 2; + + // metadata says there is a dictionary but it's not the first page + assertThatThrownBy(() -> createPageReader(totalValueCount, compressionCodec, true, ImmutableList.of(Slices.wrappedBuffer(bytes))).readDictionaryPage()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("DictionaryPage has to be the first page in the column chunk"); + + // metadata says there is no dictionary, but it's there as second page + PageReader pageReader = createPageReader(totalValueCount, compressionCodec, false, ImmutableList.of(Slices.wrappedBuffer(bytes))); + assertTrue(pageReader.hasNext()); + pageReader.skipNextPage(); + assertThatThrownBy(pageReader::readPage).isInstanceOf(RuntimeException.class).hasCauseInstanceOf(ParquetCorruptionException.class); + } + + private static void assertSinglePage(CompressionCodecName compressionCodec, int valueCount, PageHeader pageHeader, byte[] compressedDataPage, List slices) + throws IOException + { + assertPages(compressionCodec, valueCount, 1, pageHeader, compressedDataPage, slices); + } + + private static void assertPages(CompressionCodecName compressionCodec, int valueCount, int pageCount, PageHeader pageHeader, byte[] compressedDataPage, List slices) + throws IOException + { + assertPages(compressionCodec, valueCount, pageCount, pageHeader, compressedDataPage, false, slices); + } + + private static void assertPages( + CompressionCodecName compressionCodec, + int valueCount, + int pageCount, + PageHeader pageHeader, + byte[] compressedDataPage, + boolean hasDictionary, + List slices) + throws IOException + { + PageReader pageReader = createPageReader(valueCount, compressionCodec, hasDictionary, slices); + DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); + assertEquals(dictionaryPage != null, hasDictionary); + + for (int i = 0; i < pageCount; i++) { + assertTrue(pageReader.hasNext()); + DataPage decompressedPage = pageReader.readPage(); + assertNotNull(decompressedPage); + assertDataPageEquals(pageHeader, DATA_PAGE, compressedDataPage, decompressedPage); + } + assertFalse(pageReader.hasNext()); + assertNull(pageReader.readPage()); + } + + @DataProvider + public Object[][] pageParameters() + { + return new Object[][] {{UNCOMPRESSED, V1}, {SNAPPY, V1}, {UNCOMPRESSED, V2}, {SNAPPY, V2}}; + } + + public enum DataPageType + { + V1(PageType.DATA_PAGE) { + @Override + public void setDataPageHeader(PageHeader pageHeader, int valueCount) + { + pageHeader.setData_page_header(new DataPageHeader(valueCount, Encoding.PLAIN, Encoding.PLAIN, Encoding.PLAIN)); + } + + @Override + public byte[] compress(CompressionCodecName compressionCodec, byte[] dataPage) + { + return TestPageReader.compress(compressionCodec, dataPage, 0, dataPage.length); + } + }, + V2(DATA_PAGE_V2) { + @Override + public void setDataPageHeader(PageHeader pageHeader, int valueCount) + { + pageHeader.setData_page_header_v2(new DataPageHeaderV2(valueCount, 0, valueCount, Encoding.PLAIN, 1, 1)); + } + + @Override + public byte[] compress(CompressionCodecName compressionCodec, byte[] dataPage) + { + // compress only the date, copy definition and repetition levels uncompressed + byte[] compressedData = TestPageReader.compress(compressionCodec, dataPage, 2, dataPage.length - 2); + Slice slice = Slices.allocate(2 + compressedData.length); + slice.setBytes(0, dataPage, 0, 2); + slice.setBytes(2, compressedData); + return slice.byteArray(); + } + }; + + private final PageType pageType; + + DataPageType(PageType pageType) + { + this.pageType = requireNonNull(pageType, "pageType is null"); + } + + public abstract void setDataPageHeader(PageHeader pageHeader, int valueCount); + + public PageType pageType() + { + return pageType; + } + + public abstract byte[] compress(CompressionCodecName compressionCodec, byte[] dataPage); + } + + private static byte[] compress(CompressionCodecName compressionCodec, byte[] bytes, int offset, int length) + { + if (compressionCodec == UNCOMPRESSED) { + return Arrays.copyOfRange(bytes, offset, offset + length); + } + if (compressionCodec == SNAPPY) { + byte[] out = new byte[SnappyRawCompressor.maxCompressedLength(length)]; + int compressedSize = new SnappyCompressor().compress(bytes, offset, length, out, 0, out.length); + return Arrays.copyOf(out, compressedSize); + } + throw new IllegalArgumentException("unsupported compression code " + compressionCodec); + } + + private static PageReader createPageReader(int valueCount, CompressionCodecName compressionCodec, boolean hasDictionary, List slices) + throws IOException + { + EncodingStats.Builder encodingStats = new EncodingStats.Builder(); + if (hasDictionary) { + encodingStats.addDictEncoding(PLAIN); + encodingStats.addDataEncoding(RLE_DICTIONARY); + } + ColumnChunkMetaData columnChunkMetaData = ColumnChunkMetaData.get( + ColumnPath.get(""), + INT32, + compressionCodec, + encodingStats.build(), + ImmutableSet.of(), + Statistics.createStats(Types.optional(INT32).named("fake_type")), + 0, + 0, + valueCount, + 0, + 0); + return PageReader.createPageReader( + new ChunkedInputStream(slices.stream().map(TestingChunkReader::new).collect(toImmutableList())), + columnChunkMetaData, + new ColumnDescriptor(new String[] {}, new PrimitiveType(REQUIRED, INT32, ""), 0, 0), + null, + Optional.empty()); + } + + private static void assertDataPageEquals(PageHeader pageHeader, byte[] dataPage, byte[] compressedDataPage, DataPage decompressedPage) + { + assertThat(decompressedPage.getUncompressedSize()).isEqualTo(pageHeader.getUncompressed_page_size()); + assertThat(pageHeader.getCompressed_page_size()).isEqualTo(compressedDataPage.length); + assertThat(decompressedPage.getFirstRowIndex()).isEmpty(); + if (PageType.DATA_PAGE.equals(pageHeader.getType())) { + assertDataPageV1(dataPage, pageHeader, decompressedPage); + } + if (DATA_PAGE_V2.equals(pageHeader.getType())) { + assertDataPageV2(dataPage, pageHeader, decompressedPage); + } + } + + private static void assertDataPageV1(byte[] dataPage, PageHeader pageHeader, DataPage decompressedPage) + { + DataPageHeader dataPageHeader = pageHeader.getData_page_header(); + assertThat(decompressedPage.getValueCount()).isEqualTo(dataPageHeader.getNum_values()); + assertThat(decompressedPage).isInstanceOf(DataPageV1.class); + DataPageV1 decompressedV1Page = (DataPageV1) decompressedPage; + assertThat(decompressedV1Page.getSlice()).isEqualTo(Slices.wrappedBuffer(dataPage)); + assertEncodingEquals(decompressedV1Page.getValueEncoding(), dataPageHeader.getEncoding()); + assertEncodingEquals(decompressedV1Page.getDefinitionLevelEncoding(), dataPageHeader.getDefinition_level_encoding()); + assertEncodingEquals(decompressedV1Page.getRepetitionLevelEncoding(), dataPageHeader.getRepetition_level_encoding()); + } + + private static void assertDataPageV2(byte[] dataPage, PageHeader pageHeader, DataPage decompressedPage) + { + DataPageHeaderV2 dataPageHeader = pageHeader.getData_page_header_v2(); + assertThat(decompressedPage.getValueCount()).isEqualTo(dataPageHeader.getNum_values()); + assertThat(decompressedPage).isInstanceOf(DataPageV2.class); + DataPageV2 decompressedV2Page = (DataPageV2) decompressedPage; + int dataOffset = dataPageHeader.getDefinition_levels_byte_length() + dataPageHeader.getRepetition_levels_byte_length(); + assertThat(decompressedV2Page.getSlice()).isEqualTo(Slices.wrappedBuffer(dataPage).slice(dataOffset, dataPage.length - dataOffset)); + assertEncodingEquals(decompressedV2Page.getDataEncoding(), dataPageHeader.getEncoding()); + + assertThat(decompressedV2Page.getRepetitionLevels()).isEqualTo(Slices.wrappedBuffer(dataPage).slice(0, 1)); + assertThat(decompressedV2Page.getDefinitionLevels()).isEqualTo(Slices.wrappedBuffer(dataPage).slice(1, 1)); + } + + private static void assertEncodingEquals(ParquetEncoding parquetEncoding, Encoding encoding) + { + assertThat(parquetEncoding).isEqualTo(ParquetTypeUtils.getParquetEncoding(org.apache.parquet.column.Encoding.valueOf(encoding.name()))); + } +} diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java index 9b3105f46550..653fc1273ffe 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java @@ -15,10 +15,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.Iterables; import com.google.common.collect.ListMultimap; import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.airlift.units.DataSize; +import io.trino.memory.context.AggregatedMemoryContext; import io.trino.parquet.ChunkReader; import io.trino.parquet.DiskRange; import io.trino.parquet.ParquetReaderOptions; @@ -30,6 +32,7 @@ import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; public class TestParquetDataSource { @@ -42,13 +45,15 @@ public void testPlanReadOrdering(DataSize maxBufferSize) testingInput, new ParquetReaderOptions().withMaxBufferSize(maxBufferSize)); - ListMultimap chunkReaders = dataSource.planRead(ImmutableListMultimap.builder() - .putAll("test", new DiskRange(0, 300), new DiskRange(400, 100), new DiskRange(700, 200)) - .build()); + ListMultimap chunkReaders = dataSource.planRead( + ImmutableListMultimap.builder() + .putAll("test", new DiskRange(0, 200), new DiskRange(400, 100), new DiskRange(700, 200)) + .build(), + newSimpleAggregatedMemoryContext()); assertThat(chunkReaders.get("test")) .map(ChunkReader::read) .isEqualTo(ImmutableList.of( - testingInput.slice(0, 300), + testingInput.slice(0, 200), testingInput.slice(400, 100), testingInput.slice(700, 200))); } @@ -57,9 +62,48 @@ public void testPlanReadOrdering(DataSize maxBufferSize) public Object[][] testPlanReadOrderingProvider() { return new Object[][] { - {DataSize.ofBytes(0)}, // All large ranges {DataSize.ofBytes(200)}, // Mix of large and small ranges {DataSize.ofBytes(100000000)}, // All small ranges }; } + + @Test + public void testMemoryAccounting() + throws IOException + { + Slice testingInput = Slices.wrappedIntArray(IntStream.range(0, 1000).toArray()); + TestingParquetDataSource dataSource = new TestingParquetDataSource( + testingInput, + new ParquetReaderOptions().withMaxBufferSize(DataSize.ofBytes(500))); + AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext(); + ListMultimap chunkReaders = dataSource.planRead(ImmutableListMultimap.builder() + .put("1", new DiskRange(0, 200)) + .put("2", new DiskRange(400, 100)) + .put("3", new DiskRange(700, 200)) + .build(), + memoryContext); + + ChunkReader firstReader = Iterables.getOnlyElement(chunkReaders.get("1")); + ChunkReader secondReader = Iterables.getOnlyElement(chunkReaders.get("2")); + ChunkReader thirdReader = Iterables.getOnlyElement(chunkReaders.get("3")); + assertEquals(memoryContext.getBytes(), 0); + firstReader.read(); + // first and second range are merged + assertEquals(memoryContext.getBytes(), 500); + firstReader.free(); + // since the second reader is not freed, the memory is still retained + assertEquals(memoryContext.getBytes(), 500); + thirdReader.read(); + // third reader is standalone so only retains its size + assertEquals(memoryContext.getBytes(), 700); + thirdReader.free(); + // third reader is standalone, free releases the memory + assertEquals(memoryContext.getBytes(), 500); + secondReader.read(); + // second reader is merged with the first, read only accesses already cached data + assertEquals(memoryContext.getBytes(), 500); + secondReader.free(); + // both readers using merged reader are freed, all memory is released + assertEquals(memoryContext.getBytes(), 0); + } } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestingChunkReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestingChunkReader.java new file mode 100644 index 000000000000..464860716f79 --- /dev/null +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestingChunkReader.java @@ -0,0 +1,56 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader; + +import io.airlift.slice.Slice; +import io.trino.parquet.ChunkReader; + +import javax.annotation.Nullable; + +import static java.util.Objects.requireNonNull; + +public class TestingChunkReader + implements ChunkReader +{ + @Nullable + private Slice slice; + + public TestingChunkReader(Slice slice) + { + this.slice = requireNonNull(slice, "slice is null"); + } + + @Override + public Slice read() + { + return requireNonNull(slice, "slice is null"); + } + + @Override + public long getDiskOffset() + { + return 0; + } + + @Override + public void free() + { + slice = null; + } + + public boolean isFreed() + { + return slice == null; + } +} diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/flat/TestFlatColumnReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/flat/TestFlatColumnReader.java index 1f9d3b4ce424..d75720005e40 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/flat/TestFlatColumnReader.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/flat/TestFlatColumnReader.java @@ -13,12 +13,14 @@ */ package io.trino.parquet.reader.flat; +import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.trino.parquet.DataPage; import io.trino.parquet.DataPageV1; import io.trino.parquet.DataPageV2; import io.trino.parquet.DictionaryPage; +import io.trino.parquet.Page; import io.trino.parquet.ParquetEncoding; import io.trino.parquet.PrimitiveField; import io.trino.parquet.reader.ColumnReader; @@ -549,7 +551,7 @@ private static PageReader getSimplePageReaderMock(ParquetEncoding encoding) encoding, encoding, PLAIN)); - return new PageReader(UNCOMPRESSED, pages, null, 1, false); + return new PageReader(UNCOMPRESSED, pages.iterator(), false, false); } private static PageReader getNullOnlyPageReaderMock() @@ -567,7 +569,7 @@ private static PageReader getNullOnlyPageReaderMock() RLE, RLE, PLAIN)); - return new PageReader(UNCOMPRESSED, pages, null, 1, false); + return new PageReader(UNCOMPRESSED, pages.iterator(), false, false); } private static PageReader getPageReaderMock(LinkedList dataPages, @Nullable DictionaryPage dictionaryPage) @@ -577,13 +579,17 @@ private static PageReader getPageReaderMock(LinkedList dataPages, @Nul private static PageReader getPageReaderMock(LinkedList dataPages, @Nullable DictionaryPage dictionaryPage, boolean hasNoNulls) { + if (dictionaryPage != null) { + return new PageReader( + UNCOMPRESSED, + ImmutableList.builder().add(dictionaryPage).addAll(dataPages).build().iterator(), + true, + hasNoNulls); + } return new PageReader( UNCOMPRESSED, - dataPages, - dictionaryPage, - dataPages.stream() - .mapToInt(DataPage::getValueCount) - .sum(), + dataPages.iterator(), + false, hasNoNulls); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetReaderConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetReaderConfig.java index 817f206147d9..b91501a577e9 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetReaderConfig.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetReaderConfig.java @@ -18,6 +18,7 @@ import io.airlift.configuration.DefunctConfig; import io.airlift.configuration.LegacyConfig; import io.airlift.units.DataSize; +import io.airlift.units.MinDataSize; import io.trino.parquet.ParquetReaderOptions; import javax.validation.constraints.NotNull; @@ -73,6 +74,7 @@ public ParquetReaderConfig setMaxMergeDistance(DataSize distance) } @NotNull + @MinDataSize("1MB") public DataSize getMaxBufferSize() { return options.getMaxBufferSize(); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestParquetPageSkipping.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestParquetPageSkipping.java index 7868d8771bc3..255a89365d78 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestParquetPageSkipping.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestParquetPageSkipping.java @@ -27,8 +27,7 @@ protected QueryRunner createQueryRunner() .setHiveProperties( ImmutableMap.of( "parquet.use-column-index", "true", - // Small max-buffer-size allows testing mix of small and large ranges in HdfsParquetDataSource#planRead - "parquet.max-buffer-size", "400B", + "parquet.max-buffer-size", "1MB", "parquet.optimized-reader.enabled", "false")) .build(); } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestParquetPageSkippingWithOptimizedReader.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestParquetPageSkippingWithOptimizedReader.java index 32d033dd8d4b..f68b673d9137 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestParquetPageSkippingWithOptimizedReader.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestParquetPageSkippingWithOptimizedReader.java @@ -27,8 +27,7 @@ protected QueryRunner createQueryRunner() .setHiveProperties( ImmutableMap.of( "parquet.use-column-index", "true", - // Small max-buffer-size allows testing mix of small and large ranges in HdfsParquetDataSource#planRead - "parquet.max-buffer-size", "400B", + "parquet.max-buffer-size", "1MB", "parquet.optimized-reader.enabled", "true")) .build(); } From b581c5276093287cf8d2a4acb288a25290e599df Mon Sep 17 00:00:00 2001 From: Lukasz Stec Date: Wed, 21 Dec 2022 12:04:35 +0100 Subject: [PATCH 6/6] Support reading large parquet column chunks We can allow for a big DiskRange to be passed to the ParquetDataSource.planRead, since it's going to split the ranges into small chunks anyway. --- .../java/io/trino/parquet/AbstractParquetDataSource.java | 7 +++++-- .../src/main/java/io/trino/parquet/DiskRange.java | 9 ++++----- .../main/java/io/trino/parquet/reader/ParquetReader.java | 4 ++-- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java b/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java index f8edc10eb24f..89bac6f959b5 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java @@ -207,7 +207,7 @@ public Slice read() throws IOException { int offset = toIntExact(diskRange.getOffset() - mergedRange.getOffset()); - return mergedRangeLoader.read().slice(offset, diskRange.getLength()); + return mergedRangeLoader.read().slice(offset, toIntExact(diskRange.getLength())); } @Override @@ -278,6 +278,8 @@ private static List mergeAdjacentDiskRanges(Collection dis private class ReferenceCountedReader implements ChunkReader { + // See jdk.internal.util.ArraysSupport.SOFT_MAX_ARRAY_LENGTH for an explanation + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; private final DiskRange range; private final LocalMemoryContext readerMemoryUsage; private Slice data; @@ -286,6 +288,7 @@ private class ReferenceCountedReader public ReferenceCountedReader(DiskRange range, AggregatedMemoryContext memoryContext) { this.range = range; + checkArgument(range.getLength() <= MAX_ARRAY_SIZE, "Cannot read range bigger than %s but got %s", MAX_ARRAY_SIZE, range); this.readerMemoryUsage = memoryContext.newLocalMemoryContext(ReferenceCountedReader.class.getSimpleName()); } @@ -308,7 +311,7 @@ public Slice read() checkState(referenceCount > 0, "Chunk reader is already closed"); if (data == null) { - byte[] buffer = new byte[range.getLength()]; + byte[] buffer = new byte[toIntExact(range.getLength())]; readFully(range.getOffset(), buffer, 0, buffer.length); data = Slices.wrappedBuffer(buffer); readerMemoryUsage.setBytes(data.length()); diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/DiskRange.java b/lib/trino-parquet/src/main/java/io/trino/parquet/DiskRange.java index cfa24e4431dd..a755ece0e44c 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/DiskRange.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/DiskRange.java @@ -17,16 +17,15 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; -import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; // same as io.trino.orc.DiskRange public final class DiskRange { private final long offset; - private final int length; + private final long length; - public DiskRange(long offset, int length) + public DiskRange(long offset, long length) { checkArgument(offset >= 0, "offset is negative"); checkArgument(length > 0, "length must be at least 1"); @@ -40,7 +39,7 @@ public long getOffset() return offset; } - public int getLength() + public long getLength() { return length; } @@ -65,7 +64,7 @@ public DiskRange span(DiskRange otherDiskRange) requireNonNull(otherDiskRange, "otherDiskRange is null"); long start = Math.min(this.offset, otherDiskRange.getOffset()); long end = Math.max(getEnd(), otherDiskRange.getEnd()); - return new DiskRange(start, toIntExact(end - start)); + return new DiskRange(start, end - start); } @Override diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java index a6f0cac9de39..33c2e67c8968 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java @@ -214,14 +214,14 @@ public ParquetReader( filteredOffsetIndex = getFilteredOffsetIndex(blockRowRanges[rowGroup], rowGroup, rowGroupRowCount, columnPath); } if (filteredOffsetIndex == null) { - DiskRange range = new DiskRange(startingPosition, toIntExact(totalLength)); + DiskRange range = new DiskRange(startingPosition, totalLength); totalDataSize = range.getLength(); ranges.put(new ChunkKey(columnId, rowGroup), range); } else { List offsetRanges = filteredOffsetIndex.calculateOffsetRanges(startingPosition); for (OffsetRange offsetRange : offsetRanges) { - DiskRange range = new DiskRange(offsetRange.getOffset(), toIntExact(offsetRange.getLength())); + DiskRange range = new DiskRange(offsetRange.getOffset(), offsetRange.getLength()); totalDataSize += range.getLength(); ranges.put(new ChunkKey(columnId, rowGroup), range); }