diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java b/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java index 350a88313f47..89bac6f959b5 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java @@ -19,6 +19,8 @@ import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.airlift.units.DataSize; +import io.trino.memory.context.AggregatedMemoryContext; +import io.trino.memory.context.LocalMemoryContext; import java.io.IOException; import java.util.ArrayList; @@ -26,6 +28,8 @@ import java.util.List; import java.util.Map; +import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static java.lang.Math.toIntExact; @@ -116,7 +120,7 @@ private void readFully(long position, byte[] buffer, int bufferOffset, int buffe } @Override - public final ListMultimap planRead(ListMultimap diskRanges) + public final ListMultimap planRead(ListMultimap diskRanges, AggregatedMemoryContext memoryContext) { requireNonNull(diskRanges, "diskRanges is null"); @@ -136,7 +140,7 @@ public final ListMultimap planRead(ListMultimap smallRanges = smallRangesBuilder.build(); @@ -144,16 +148,36 @@ public final ListMultimap planRead(ListMultimap slices = ImmutableListMultimap.builder(); - slices.putAll(readSmallDiskRanges(smallRanges)); - slices.putAll(readLargeDiskRanges(largeRanges)); - // Re-order ChunkReaders by their DiskRange offsets as ParquetColumnChunk expects + slices.putAll(readSmallDiskRanges(smallRanges, memoryContext)); + slices.putAll(readLargeDiskRanges(largeRanges, memoryContext)); + // Re-order ChunkReaders by their DiskRange offsets as ParquetColumnChunkIterator expects // the input slices to be in the order that they're present in the file slices.orderValuesBy(comparingLong(ChunkReader::getDiskOffset)); return slices.build(); } - private ListMultimap readSmallDiskRanges(ListMultimap diskRanges) + private List splitLargeRange(DiskRange range) + { + int maxBufferSizeBytes = toIntExact(options.getMaxBufferSize().toBytes()); + checkArgument(maxBufferSizeBytes > 0, "maxBufferSize must by larger than zero but is %s bytes", maxBufferSizeBytes); + ImmutableList.Builder ranges = ImmutableList.builder(); + long endOffset = range.getOffset() + range.getLength(); + long offset = range.getOffset(); + while (offset + maxBufferSizeBytes < endOffset) { + ranges.add(new DiskRange(offset, maxBufferSizeBytes)); + offset += maxBufferSizeBytes; + } + + long lengthLeft = endOffset - offset; + if (lengthLeft > 0) { + ranges.add(new DiskRange(offset, toIntExact(lengthLeft))); + } + + return ranges.build(); + } + + private ListMultimap readSmallDiskRanges(ListMultimap diskRanges, AggregatedMemoryContext memoryContext) { if (diskRanges.isEmpty()) { return ImmutableListMultimap.of(); @@ -163,7 +187,7 @@ private ListMultimap readSmallDiskRanges(ListMultimap slices = ImmutableListMultimap.builder(); for (DiskRange mergedRange : mergedRanges) { - ReferenceCountedReader mergedRangeLoader = new ReferenceCountedReader(mergedRange); + ReferenceCountedReader mergedRangeLoader = new ReferenceCountedReader(mergedRange, memoryContext); for (Map.Entry diskRangeEntry : diskRanges.entries()) { DiskRange diskRange = diskRangeEntry.getValue(); @@ -183,7 +207,7 @@ public Slice read() throws IOException { int offset = toIntExact(diskRange.getOffset() - mergedRange.getOffset()); - return mergedRangeLoader.read().slice(offset, diskRange.getLength()); + return mergedRangeLoader.read().slice(offset, toIntExact(diskRange.getLength())); } @Override @@ -203,7 +227,7 @@ public void free() return sliceStreams; } - private ListMultimap readLargeDiskRanges(ListMultimap diskRanges) + private ListMultimap readLargeDiskRanges(ListMultimap diskRanges, AggregatedMemoryContext memoryContext) { if (diskRanges.isEmpty()) { return ImmutableListMultimap.of(); @@ -211,7 +235,7 @@ private ListMultimap readLargeDiskRanges(ListMultimap slices = ImmutableListMultimap.builder(); for (Map.Entry entry : diskRanges.entries()) { - slices.put(entry.getKey(), new ReferenceCountedReader(entry.getValue())); + slices.put(entry.getKey(), new ReferenceCountedReader(entry.getValue(), memoryContext)); } return slices.build(); } @@ -254,13 +278,18 @@ private static List mergeAdjacentDiskRanges(Collection dis private class ReferenceCountedReader implements ChunkReader { + // See jdk.internal.util.ArraysSupport.SOFT_MAX_ARRAY_LENGTH for an explanation + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; private final DiskRange range; + private final LocalMemoryContext readerMemoryUsage; private Slice data; private int referenceCount = 1; - public ReferenceCountedReader(DiskRange range) + public ReferenceCountedReader(DiskRange range, AggregatedMemoryContext memoryContext) { this.range = range; + checkArgument(range.getLength() <= MAX_ARRAY_SIZE, "Cannot read range bigger than %s but got %s", MAX_ARRAY_SIZE, range); + this.readerMemoryUsage = memoryContext.newLocalMemoryContext(ReferenceCountedReader.class.getSimpleName()); } public void addReference() @@ -282,9 +311,10 @@ public Slice read() checkState(referenceCount > 0, "Chunk reader is already closed"); if (data == null) { - byte[] buffer = new byte[range.getLength()]; + byte[] buffer = new byte[toIntExact(range.getLength())]; readFully(range.getOffset(), buffer, 0, buffer.length); data = Slices.wrappedBuffer(buffer); + readerMemoryUsage.setBytes(data.length()); } return data; @@ -298,7 +328,17 @@ public void free() referenceCount--; if (referenceCount == 0) { data = null; + readerMemoryUsage.setBytes(0); } } + + @Override + public String toString() + { + return toStringHelper(this) + .add("range", range) + .add("referenceCount", referenceCount) + .toString(); + } } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/DiskRange.java b/lib/trino-parquet/src/main/java/io/trino/parquet/DiskRange.java index cfa24e4431dd..a755ece0e44c 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/DiskRange.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/DiskRange.java @@ -17,16 +17,15 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; -import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; // same as io.trino.orc.DiskRange public final class DiskRange { private final long offset; - private final int length; + private final long length; - public DiskRange(long offset, int length) + public DiskRange(long offset, long length) { checkArgument(offset >= 0, "offset is negative"); checkArgument(length > 0, "length must be at least 1"); @@ -40,7 +39,7 @@ public long getOffset() return offset; } - public int getLength() + public long getLength() { return length; } @@ -65,7 +64,7 @@ public DiskRange span(DiskRange otherDiskRange) requireNonNull(otherDiskRange, "otherDiskRange is null"); long start = Math.min(this.offset, otherDiskRange.getOffset()); long end = Math.max(getEnd(), otherDiskRange.getEnd()); - return new DiskRange(start, toIntExact(end - start)); + return new DiskRange(start, end - start); } @Override diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetDataSource.java b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetDataSource.java index 4fe7ed74ec7b..cfa55200525e 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetDataSource.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetDataSource.java @@ -15,6 +15,7 @@ import com.google.common.collect.ListMultimap; import io.airlift.slice.Slice; +import io.trino.memory.context.AggregatedMemoryContext; import java.io.Closeable; import java.io.IOException; @@ -36,7 +37,7 @@ Slice readTail(int length) Slice readFully(long position, int length) throws IOException; - ListMultimap planRead(ListMultimap diskRanges); + ListMultimap planRead(ListMultimap diskRanges, AggregatedMemoryContext memoryContext); @Override default void close() diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ChunkedInputStream.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ChunkedInputStream.java new file mode 100644 index 000000000000..998f330844ce --- /dev/null +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ChunkedInputStream.java @@ -0,0 +1,151 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader; + +import io.airlift.slice.BasicSliceInput; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.parquet.ChunkReader; +import io.trino.parquet.ParquetReaderOptions; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Iterator; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkPositionIndexes; +import static com.google.common.io.ByteStreams.readFully; +import static java.util.Objects.requireNonNull; + +/** + * A single continuous {@link InputStream} over multiple {@link Slice}s read on demand using given collection of {@link ChunkReader}s. + * It is used to read parquet column chunk in limited (small) byte chunks (8MB by default, controlled by {@link ParquetReaderOptions#getMaxReadBlockSize()}). + * Column chunks consists of multiple pages. + * This abstraction is used because the page size is unknown until the page header is read + * and page header and page data can be split between two or more byte chunks. + */ +public final class ChunkedInputStream + extends InputStream +{ + private final Iterator chunks; + private ChunkReader currentChunkReader; + private BasicSliceInput current; + + public ChunkedInputStream(Collection chunks) + { + requireNonNull(chunks, "chunks is null"); + checkArgument(!chunks.isEmpty(), "At least one chunk is expected but got none"); + this.chunks = chunks.iterator(); + readNextChunk(); + } + + public Slice getSlice(int length) + throws IOException + { + if (length == 0) { + return Slices.EMPTY_SLICE; + } + ensureOpen(); + while (!current.isReadable()) { + checkArgument(chunks.hasNext(), "Requested %s bytes but 0 was available", length); + readNextChunk(); + } + if (current.available() >= length) { + return current.readSlice(length); + } + // requested length crosses the slice boundary + byte[] bytes = new byte[length]; + try { + readFully(this, bytes, 0, bytes.length); + } + catch (IOException e) { + throw new RuntimeException("Failed to read " + length + " bytes", e); + } + return Slices.wrappedBuffer(bytes); + } + + @Override + public int read(byte[] b, int off, int len) + throws IOException + { + checkPositionIndexes(off, off + len, b.length); + if (len == 0) { + return 0; + } + ensureOpen(); + while (!current.isReadable()) { + if (!chunks.hasNext()) { + return -1; + } + readNextChunk(); + } + + return current.read(b, off, len); + } + + @Override + public int read() + throws IOException + { + ensureOpen(); + while (!current.isReadable() && chunks.hasNext()) { + readNextChunk(); + } + + return current.read(); + } + + @Override + public int available() + throws IOException + { + ensureOpen(); + return current.available(); + } + + @Override + public void close() + { + if (current == null) { + // already closed + return; + } + currentChunkReader.free(); + + while (chunks.hasNext()) { + chunks.next().free(); + } + current = null; + } + + private void ensureOpen() + throws IOException + { + if (current == null) { + throw new IOException("Stream closed"); + } + } + + private void readNextChunk() + { + if (currentChunkReader != null) { + currentChunkReader.free(); + } + currentChunkReader = chunks.next(); + Slice slice = currentChunkReader.readUnchecked(); + checkArgument(slice.length() > 0, "all chunks have to be not empty"); + current = slice.getInput(); + } +} diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java index b3a097a447c2..20b8b244642e 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java @@ -13,45 +13,71 @@ */ package io.trino.parquet.reader; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; import io.trino.parquet.DataPage; import io.trino.parquet.DataPageV1; import io.trino.parquet.DataPageV2; import io.trino.parquet.DictionaryPage; +import io.trino.parquet.Page; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; + +import javax.annotation.Nullable; import java.io.IOException; -import java.util.LinkedList; +import java.util.Iterator; +import java.util.Optional; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import static io.trino.parquet.ParquetCompressionUtils.decompress; public final class PageReader { private final CompressionCodecName codec; - private final long valueCount; + private final boolean hasDictionaryPage; private final boolean hasNoNulls; - private final LinkedList compressedPages; - private final DictionaryPage compressedDictionaryPage; - - /** - * @param compressedPages This parameter will be mutated destructively as {@link DataPage} entries are removed as part of {@link #readPage()}. The caller - * should not retain a reference to this list after passing it in as a constructor argument. - */ - public PageReader(CompressionCodecName codec, - LinkedList compressedPages, - DictionaryPage compressedDictionaryPage, - long valueCount, - boolean hasNoNulls) + private final PeekingIterator compressedPages; + + private boolean dictionaryAlreadyRead; + private int dataPageReadCount; + + public static PageReader createPageReader( + ChunkedInputStream columnChunk, + ColumnChunkMetaData metadata, + ColumnDescriptor columnDescriptor, + @Nullable OffsetIndex offsetIndex, + Optional fileCreatedBy) { - this.codec = codec; - this.compressedPages = compressedPages; - this.compressedDictionaryPage = compressedDictionaryPage; - this.valueCount = valueCount; - this.hasNoNulls = hasNoNulls; + // Parquet schema may specify a column definition as OPTIONAL even though there are no nulls in the actual data. + // Row-group column statistics can be used to identify such cases and switch to faster non-nullable read + // paths in FlatColumnReader. + Statistics columnStatistics = metadata.getStatistics(); + boolean hasNoNulls = columnStatistics != null && columnStatistics.getNumNulls() == 0; + ParquetColumnChunkIterator compressedPages = new ParquetColumnChunkIterator( + fileCreatedBy, + new ColumnChunkDescriptor(columnDescriptor, metadata), + columnChunk, + offsetIndex); + return new PageReader(metadata.getCodec(), compressedPages, compressedPages.hasDictionaryPage(), hasNoNulls); } - public long getTotalValueCount() + @VisibleForTesting + public PageReader( + CompressionCodecName codec, + Iterator compressedPages, + boolean hasDictionaryPage, + boolean hasNoNulls) { - return valueCount; + this.codec = codec; + this.compressedPages = Iterators.peekingIterator(compressedPages); + this.hasDictionaryPage = hasDictionaryPage; + this.hasNoNulls = hasNoNulls; } public boolean hasNoNulls() @@ -61,10 +87,14 @@ public boolean hasNoNulls() public DataPage readPage() { - if (compressedPages.isEmpty()) { + if (hasDictionaryPage) { + checkState(dictionaryAlreadyRead, "Dictionary has to be read first"); + } + if (!compressedPages.hasNext()) { return null; } - DataPage compressedPage = compressedPages.removeFirst(); + Page compressedPage = compressedPages.next(); + dataPageReadCount++; try { if (compressedPage instanceof DataPageV1) { DataPageV1 dataPageV1 = (DataPageV1) compressedPage; @@ -104,10 +134,16 @@ public DataPage readPage() public DictionaryPage readDictionaryPage() { - if (compressedDictionaryPage == null) { + if (!hasDictionaryPage) { return null; } try { + checkState(!dictionaryAlreadyRead, "Dictionary was already read"); + checkState(dataPageReadCount == 0, "Dictionary has to be read first but " + dataPageReadCount + " was read already"); + dictionaryAlreadyRead = true; + Page firstPage = compressedPages.next(); + checkArgument(firstPage instanceof DictionaryPage, "DictionaryPage has to be the first page in the column chunk but got %s", firstPage); + DictionaryPage compressedDictionaryPage = (DictionaryPage) firstPage; return new DictionaryPage( decompress(codec, compressedDictionaryPage.getSlice(), compressedDictionaryPage.getUncompressedSize()), compressedDictionaryPage.getDictionarySize(), @@ -118,18 +154,28 @@ public DictionaryPage readDictionaryPage() } } - public DataPage getNextPage() + public boolean hasNext() { - return compressedPages.getFirst(); + return compressedPages.hasNext(); } - public boolean hasNext() + public DataPage getNextPage() { - return !compressedPages.isEmpty(); + verifyDictionaryPageRead(); + + return (DataPage) compressedPages.peek(); } public void skipNextPage() { - compressedPages.removeFirst(); + verifyDictionaryPageRead(); + compressedPages.next(); + } + + private void verifyDictionaryPageRead() + { + if (hasDictionaryPage) { + checkArgument(dictionaryAlreadyRead, "Dictionary has to be read first"); + } } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunk.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java similarity index 60% rename from lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunk.java rename to lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java index f34f9a656286..99eb21234f0c 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunk.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java @@ -13,15 +13,12 @@ */ package io.trino.parquet.reader; -import io.airlift.slice.BasicSliceInput; -import io.airlift.slice.Slice; -import io.trino.parquet.DataPage; import io.trino.parquet.DataPageV1; import io.trino.parquet.DataPageV2; import io.trino.parquet.DictionaryPage; +import io.trino.parquet.Page; import io.trino.parquet.ParquetCorruptionException; import org.apache.parquet.column.Encoding; -import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.format.DataPageHeader; import org.apache.parquet.format.DataPageHeaderV2; import org.apache.parquet.format.DictionaryPageHeader; @@ -29,96 +26,93 @@ import org.apache.parquet.format.Util; import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import javax.annotation.Nullable; + import java.io.IOException; -import java.util.LinkedList; -import java.util.List; +import java.util.Iterator; import java.util.Optional; import java.util.OptionalLong; -import static com.google.common.base.Verify.verify; +import static com.google.common.base.Preconditions.checkArgument; import static io.trino.parquet.ParquetTypeUtils.getParquetEncoding; import static java.util.Objects.requireNonNull; -public class ParquetColumnChunk +public final class ParquetColumnChunkIterator + implements Iterator { private final Optional fileCreatedBy; private final ColumnChunkDescriptor descriptor; - private final List slices; - private int sliceIndex; - private BasicSliceInput input; + private final ChunkedInputStream input; private final OffsetIndex offsetIndex; - public ParquetColumnChunk( + private long valueCount; + private int dataPageCount; + private boolean dictionaryWasRead; + + public ParquetColumnChunkIterator( Optional fileCreatedBy, ColumnChunkDescriptor descriptor, - List slices, - OffsetIndex offsetIndex) + ChunkedInputStream input, + @Nullable OffsetIndex offsetIndex) { this.fileCreatedBy = requireNonNull(fileCreatedBy, "fileCreatedBy is null"); - this.descriptor = descriptor; - this.slices = slices; - this.sliceIndex = 0; - this.input = slices.get(0).getInput(); + this.descriptor = requireNonNull(descriptor, "descriptor is null"); + this.input = requireNonNull(input, "input is null"); this.offsetIndex = offsetIndex; } - private void advanceIfNecessary() + public boolean hasDictionaryPage() { - if (input.available() == 0) { - sliceIndex++; - if (sliceIndex < slices.size()) { - input = slices.get(sliceIndex).getInput(); - } - } + return descriptor.getColumnChunkMetaData().hasDictionaryPage(); } - protected PageHeader readPageHeader() - throws IOException + @Override + public boolean hasNext() { - verify(input.available() > 0, "Reached end of input unexpectedly"); - PageHeader pageHeader = Util.readPageHeader(input); - advanceIfNecessary(); - return pageHeader; + return hasMorePages(valueCount, dataPageCount) || (hasDictionaryPage() && !dictionaryWasRead); } - public PageReader readAllPages() - throws IOException + @Override + public Page next() { - LinkedList pages = new LinkedList<>(); - DictionaryPage dictionaryPage = null; - long valueCount = 0; - int dataPageCount = 0; - while (hasMorePages(valueCount, dataPageCount)) { + checkArgument(hasNext()); + + try { PageHeader pageHeader = readPageHeader(); int uncompressedPageSize = pageHeader.getUncompressed_page_size(); int compressedPageSize = pageHeader.getCompressed_page_size(); + Page result = null; switch (pageHeader.type) { case DICTIONARY_PAGE: - if (dictionaryPage != null) { - throw new ParquetCorruptionException("%s has more than one dictionary page in column chunk", descriptor.getColumnDescriptor()); + if (dataPageCount != 0) { + throw new ParquetCorruptionException("%s has dictionary page at not first position in column chunk", descriptor.getColumnDescriptor()); } - dictionaryPage = readDictionaryPage(pageHeader, uncompressedPageSize, compressedPageSize); + result = readDictionaryPage(pageHeader, pageHeader.getUncompressed_page_size(), pageHeader.getCompressed_page_size()); + dictionaryWasRead = true; break; case DATA_PAGE: - valueCount += readDataPageV1(pageHeader, uncompressedPageSize, compressedPageSize, pages, getFirstRowIndex(dataPageCount, offsetIndex)); + result = readDataPageV1(pageHeader, uncompressedPageSize, compressedPageSize, getFirstRowIndex(dataPageCount, offsetIndex)); ++dataPageCount; break; case DATA_PAGE_V2: - valueCount += readDataPageV2(pageHeader, uncompressedPageSize, compressedPageSize, pages, getFirstRowIndex(dataPageCount, offsetIndex)); + result = readDataPageV2(pageHeader, uncompressedPageSize, compressedPageSize, getFirstRowIndex(dataPageCount, offsetIndex)); ++dataPageCount; break; default: input.skip(compressedPageSize); - advanceIfNecessary(); break; } + return result; } - // Parquet schema may specify a column definition as OPTIONAL even though there are no nulls in the actual data. - // Row-group column statistics can be used to identify such cases and switch to faster non-nullable read - // paths in FlatColumnReader. - Statistics columnStatistics = descriptor.getColumnChunkMetaData().getStatistics(); - boolean hasNoNulls = columnStatistics != null && columnStatistics.getNumNulls() == 0; - return new PageReader(descriptor.getColumnChunkMetaData().getCodec(), pages, dictionaryPage, valueCount, hasNoNulls); + catch (IOException e) { + throw new RuntimeException(e); + } + } + + private PageHeader readPageHeader() + throws IOException + { + return Util.readPageHeader(input); } private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar) @@ -129,67 +123,61 @@ private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoF return dataPageCountReadSoFar < offsetIndex.getPageCount(); } - private Slice getSlice(int size) - { - Slice slice = input.readSlice(size); - advanceIfNecessary(); - return slice; - } - private DictionaryPage readDictionaryPage(PageHeader pageHeader, int uncompressedPageSize, int compressedPageSize) + throws IOException { DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header(); return new DictionaryPage( - getSlice(compressedPageSize), + input.getSlice(compressedPageSize), uncompressedPageSize, dicHeader.getNum_values(), getParquetEncoding(Encoding.valueOf(dicHeader.getEncoding().name()))); } - private long readDataPageV1( + private DataPageV1 readDataPageV1( PageHeader pageHeader, int uncompressedPageSize, int compressedPageSize, - List pages, OptionalLong firstRowIndex) + throws IOException { DataPageHeader dataHeaderV1 = pageHeader.getData_page_header(); - pages.add(new DataPageV1( - getSlice(compressedPageSize), + valueCount += dataHeaderV1.getNum_values(); + return new DataPageV1( + input.getSlice(compressedPageSize), dataHeaderV1.getNum_values(), uncompressedPageSize, firstRowIndex, getParquetEncoding(Encoding.valueOf(dataHeaderV1.getRepetition_level_encoding().name())), getParquetEncoding(Encoding.valueOf(dataHeaderV1.getDefinition_level_encoding().name())), - getParquetEncoding(Encoding.valueOf(dataHeaderV1.getEncoding().name())))); - return dataHeaderV1.getNum_values(); + getParquetEncoding(Encoding.valueOf(dataHeaderV1.getEncoding().name()))); } - private long readDataPageV2( + private DataPageV2 readDataPageV2( PageHeader pageHeader, int uncompressedPageSize, int compressedPageSize, - List pages, OptionalLong firstRowIndex) + throws IOException { DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2(); int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length(); - pages.add(new DataPageV2( + valueCount += dataHeaderV2.getNum_values(); + return new DataPageV2( dataHeaderV2.getNum_rows(), dataHeaderV2.getNum_nulls(), dataHeaderV2.getNum_values(), - getSlice(dataHeaderV2.getRepetition_levels_byte_length()), - getSlice(dataHeaderV2.getDefinition_levels_byte_length()), + input.getSlice(dataHeaderV2.getRepetition_levels_byte_length()), + input.getSlice(dataHeaderV2.getDefinition_levels_byte_length()), getParquetEncoding(Encoding.valueOf(dataHeaderV2.getEncoding().name())), - getSlice(dataSize), + input.getSlice(dataSize), uncompressedPageSize, firstRowIndex, MetadataReader.readStats( fileCreatedBy, Optional.ofNullable(dataHeaderV2.getStatistics()), descriptor.getColumnDescriptor().getPrimitiveType()), - dataHeaderV2.isIs_compressed())); - return dataHeaderV2.getNum_values(); + dataHeaderV2.isIs_compressed()); } private static OptionalLong getFirstRowIndex(int pageIndex, OffsetIndex offsetIndex) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java index 14066ee813aa..33c2e67c8968 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java @@ -17,12 +17,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ListMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Multimap; -import io.airlift.slice.Slice; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.parquet.ChunkKey; -import io.trino.parquet.ChunkReader; import io.trino.parquet.DiskRange; import io.trino.parquet.Field; import io.trino.parquet.GroupField; @@ -65,7 +61,6 @@ import java.io.Closeable; import java.io.IOException; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -74,6 +69,7 @@ import java.util.function.Function; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.parquet.ParquetValidationUtils.validateParquet; import static io.trino.parquet.ParquetWriteValidation.StatisticsValidation; @@ -81,6 +77,7 @@ import static io.trino.parquet.ParquetWriteValidation.WriteChecksumBuilder; import static io.trino.parquet.ParquetWriteValidation.WriteChecksumBuilder.createWriteChecksumBuilder; import static io.trino.parquet.reader.ListColumnReader.calculateCollectionOffsets; +import static io.trino.parquet.reader.PageReader.createPageReader; import static java.lang.Math.max; import static java.lang.Math.min; import static java.lang.Math.toIntExact; @@ -125,8 +122,7 @@ public class ParquetReader private final ParquetReaderOptions options; private int maxBatchSize = MAX_VECTOR_LENGTH; - private AggregatedMemoryContext currentRowGroupMemoryContext; - private final Multimap chunkReaders; + private final Map chunkReaders; private final List> columnIndexStore; private final Optional writeValidation; private final Optional writeChecksumBuilder; @@ -176,7 +172,6 @@ public ParquetReader( this.dataSource = requireNonNull(dataSource, "dataSource is null"); this.timeZone = requireNonNull(timeZone, "timeZone is null"); this.memoryContext = requireNonNull(memoryContext, "memoryContext is null"); - this.currentRowGroupMemoryContext = memoryContext.newAggregatedMemoryContext(); this.options = requireNonNull(options, "options is null"); this.columnReaders = new HashMap<>(); this.maxBytesPerCell = new HashMap<>(); @@ -219,14 +214,14 @@ public ParquetReader( filteredOffsetIndex = getFilteredOffsetIndex(blockRowRanges[rowGroup], rowGroup, rowGroupRowCount, columnPath); } if (filteredOffsetIndex == null) { - DiskRange range = new DiskRange(startingPosition, toIntExact(totalLength)); + DiskRange range = new DiskRange(startingPosition, totalLength); totalDataSize = range.getLength(); ranges.put(new ChunkKey(columnId, rowGroup), range); } else { List offsetRanges = filteredOffsetIndex.calculateOffsetRanges(startingPosition); for (OffsetRange offsetRange : offsetRanges) { - DiskRange range = new DiskRange(offsetRange.getOffset(), toIntExact(offsetRange.getLength())); + DiskRange range = new DiskRange(offsetRange.getOffset(), offsetRange.getLength()); totalDataSize += range.getLength(); ranges.put(new ChunkKey(columnId, rowGroup), range); } @@ -241,15 +236,17 @@ public ParquetReader( } } this.codecMetrics = ImmutableMap.copyOf(codecMetrics); - this.chunkReaders = dataSource.planRead(ranges); + this.chunkReaders = dataSource.planRead(ranges, memoryContext).asMap().entrySet().stream() + .collect(toImmutableMap(Map.Entry::getKey, entry -> new ChunkedInputStream(entry.getValue()))); } @Override public void close() throws IOException { - freeCurrentRowGroupBuffers(); - currentRowGroupMemoryContext.close(); + for (ChunkedInputStream chunkedInputStream : chunkReaders.values()) { + chunkedInputStream.close(); + } dataSource.close(); if (writeChecksumBuilder.isPresent()) { @@ -304,8 +301,6 @@ private int nextBatch() private boolean advanceToNextRowGroup() throws IOException { - currentRowGroupMemoryContext.close(); - currentRowGroupMemoryContext = memoryContext.newAggregatedMemoryContext(); freeCurrentRowGroupBuffers(); if (currentRowGroup >= 0 && rowGroupStatisticsValidation.isPresent()) { @@ -342,11 +337,9 @@ private void freeCurrentRowGroupBuffers() } for (int column = 0; column < primitiveFields.size(); column++) { - Collection readers = chunkReaders.get(new ChunkKey(column, currentRowGroup)); - if (readers != null) { - for (ChunkReader reader : readers) { - reader.free(); - } + ChunkedInputStream chunkedStream = chunkReaders.get(new ChunkKey(column, currentRowGroup)); + if (chunkedStream != null) { + chunkedStream.close(); } } } @@ -437,8 +430,10 @@ private ColumnChunk readPrimitive(PrimitiveField field) if (rowRanges != null) { offsetIndex = getFilteredOffsetIndex(rowRanges, currentRowGroup, currentBlockMetadata.getRowCount(), metadata.getPath()); } - List slices = allocateBlock(fieldId); - columnReader.setPageReader(createPageReader(slices, metadata, columnDescriptor, offsetIndex), Optional.ofNullable(rowRanges)); + ChunkedInputStream columnChunkInputStream = chunkReaders.get(new ChunkKey(fieldId, currentRowGroup)); + columnReader.setPageReader( + createPageReader(columnChunkInputStream, metadata, columnDescriptor, offsetIndex, fileCreatedBy), + Optional.ofNullable(rowRanges)); } ColumnChunk columnChunk = columnReader.readPrimitive(); @@ -465,29 +460,6 @@ public Metrics getMetrics() return new Metrics(metrics.buildOrThrow()); } - private PageReader createPageReader(List slices, ColumnChunkMetaData metadata, ColumnDescriptor columnDescriptor, OffsetIndex offsetIndex) - throws IOException - { - ColumnChunkDescriptor descriptor = new ColumnChunkDescriptor(columnDescriptor, metadata); - ParquetColumnChunk columnChunk = new ParquetColumnChunk(fileCreatedBy, descriptor, slices, offsetIndex); - return columnChunk.readAllPages(); - } - - private List allocateBlock(int fieldId) - throws IOException - { - Collection readers = chunkReaders.get(new ChunkKey(fieldId, currentRowGroup)); - List slices = Lists.newArrayListWithExpectedSize(readers.size()); - for (ChunkReader reader : readers) { - Slice slice = reader.read(); - // todo this just an estimate and doesn't reflect actual retained memory - currentRowGroupMemoryContext.newLocalMemoryContext(ParquetReader.class.getSimpleName()) - .setBytes(slice.length()); - slices.add(slice); - } - return slices; - } - private ColumnChunkMetaData getColumnChunkMetaData(BlockMetaData blockMetaData, ColumnDescriptor columnDescriptor) throws IOException { diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PrimitiveColumnReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PrimitiveColumnReader.java index 6d7c7262e336..1dc845b9116b 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PrimitiveColumnReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PrimitiveColumnReader.java @@ -60,10 +60,8 @@ public abstract class PrimitiveColumnReader private int nextBatchSize; private LevelReader repetitionReader; private LevelReader definitionReader; - private long totalValueCount; private PageReader pageReader; private Dictionary dictionary; - private int currentValueCount; private DataPage page; private int remainingValueCountInPage; private int readOffset; @@ -117,8 +115,6 @@ public void setPageReader(PageReader pageReader, Optional row else { dictionary = null; } - checkArgument(pageReader.getTotalValueCount() > 0, "page is empty"); - totalValueCount = pageReader.getTotalValueCount(); if (rowRanges.isPresent()) { indexIterator = rowRanges.get().getParquetRowRanges().iterator(); // If rowRanges is empty for a row-group, then no page needs to be read, and we should not reach here @@ -204,7 +200,7 @@ private void skipValues(long valuesToRead) * 100 │ p5 │ │ │ * └──────┴──────┴──────┘ * - * + *

* The pages 1, 2, 3 in col1 are skipped so we have to skip the rows [20, 79]. Because page 1 in col2 contains values * only for the rows [40, 79] we skip this entire page as well. To synchronize the row reading we have to skip the * values (and the related rl and dl) for the rows [20, 39] in the end of the page 0 for col2. Similarly, we have to @@ -255,7 +251,6 @@ private void processValues(long valuesToRead, Runnable valueReader) private void seek() { - checkArgument(currentValueCount <= totalValueCount, "Already read all values in column chunk"); if (readOffset == 0) { return; } @@ -300,7 +295,6 @@ private void updateValueCounts(int valuesRead, int skipCount) valuesReader = null; } remainingValueCountInPage -= totalCount; - currentValueCount += valuesRead; } private ValuesReader readPageV1(DataPageV1 page) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java index cfbb2e016790..783310446b30 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java @@ -17,7 +17,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ListMultimap; import com.google.common.collect.Multimap; -import io.airlift.slice.Slice; import io.trino.parquet.ChunkReader; import io.trino.parquet.DiskRange; import io.trino.parquet.ParquetDataSource; @@ -35,13 +34,14 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.io.InputStream; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.BiFunction; import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static java.util.Objects.requireNonNull; /** @@ -102,9 +102,9 @@ public TrinoColumnIndexStore( public ColumnIndex getColumnIndex(ColumnPath column) { if (columnIndexStore == null) { - columnIndexStore = loadIndexes(dataSource, columnIndexReferences, (buffer, columnMetadata) -> { + columnIndexStore = loadIndexes(dataSource, columnIndexReferences, (inputStream, columnMetadata) -> { try { - return ParquetMetadataConverter.fromParquetColumnIndex(columnMetadata.getPrimitiveType(), Util.readColumnIndex(buffer.getInput())); + return ParquetMetadataConverter.fromParquetColumnIndex(columnMetadata.getPrimitiveType(), Util.readColumnIndex(inputStream)); } catch (IOException e) { throw new RuntimeException(e); @@ -119,9 +119,9 @@ public ColumnIndex getColumnIndex(ColumnPath column) public OffsetIndex getOffsetIndex(ColumnPath column) { if (offsetIndexStore == null) { - offsetIndexStore = loadIndexes(dataSource, offsetIndexReferences, (buffer, columnMetadata) -> { + offsetIndexStore = loadIndexes(dataSource, offsetIndexReferences, (inputStream, columnMetadata) -> { try { - return ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(buffer.getInput())); + return ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(inputStream)); } catch (IOException e) { throw new RuntimeException(e); @@ -135,7 +135,7 @@ public OffsetIndex getOffsetIndex(ColumnPath column) private static Map loadIndexes( ParquetDataSource dataSource, List indexMetadata, - BiFunction deserializer) + BiFunction deserializer) { // Merge multiple small reads of the file for indexes stored close to each other ListMultimap ranges = ArrayListMultimap.create(indexMetadata.size(), 1); @@ -143,15 +143,17 @@ private static Map loadIndexes( ranges.put(column.getPath(), column.getDiskRange()); } - Multimap chunkReaders = dataSource.planRead(ranges); + Multimap chunkReaders = dataSource.planRead(ranges, newSimpleAggregatedMemoryContext()); + Map columnInputStreams = chunkReaders.asMap().entrySet().stream() + .collect(toImmutableMap(Map.Entry::getKey, entry -> new ChunkedInputStream(entry.getValue()))); try { return indexMetadata.stream() .collect(toImmutableMap( ColumnIndexMetadata::getPath, - column -> deserializer.apply(getOnlyElement(chunkReaders.get(column.getPath())).readUnchecked(), column))); + column -> deserializer.apply(columnInputStreams.get(column.getPath()), column))); } finally { - chunkReaders.values().forEach(ChunkReader::free); + columnInputStreams.values().forEach(ChunkedInputStream::close); } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/FlatColumnReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/FlatColumnReader.java index 6989d3161f81..27e646de74b2 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/FlatColumnReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/FlatColumnReader.java @@ -340,7 +340,6 @@ public void setPageReader(PageReader pageReader, Optional row plainValuesDecoder.read(dictionary, 0, size); dictionaryDecoder = new DictionaryDecoder<>(dictionary, columnAdapter); } - checkArgument(pageReader.getTotalValueCount() > 0, "page is empty"); this.rowRanges = createRowRangesIterator(rowRanges); } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderBenchmark.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderBenchmark.java index 0f98d2d50e69..51684e0af3b2 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderBenchmark.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderBenchmark.java @@ -102,7 +102,7 @@ public int read() throws IOException { ColumnReader columnReader = ColumnReaderFactory.create(field, UTC, true); - columnReader.setPageReader(new PageReader(UNCOMPRESSED, new LinkedList<>(dataPages), null, dataPositions, false), Optional.empty()); + columnReader.setPageReader(new PageReader(UNCOMPRESSED, new LinkedList<>(dataPages).iterator(), false, false), Optional.empty()); int rowsRead = 0; while (rowsRead < dataPositions) { int remaining = dataPositions - rowsRead; diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestChunkedInputStream.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestChunkedInputStream.java new file mode 100644 index 000000000000..b22a3e7c8209 --- /dev/null +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestChunkedInputStream.java @@ -0,0 +1,153 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader; + +import com.google.common.collect.ImmutableList; +import com.google.common.io.ByteStreams; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.airlift.slice.Slices.EMPTY_SLICE; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class TestChunkedInputStream +{ + @Test + public void empty() + throws IOException + { + assertThatThrownBy(() -> input(ImmutableList.of())).isInstanceOf(IllegalArgumentException.class); + } + + @Test(dataProvider = "chunks") + public void testInput(List chunks) + throws IOException + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + for (byte[] chunk : chunks) { + out.write(chunk); + } + byte[] expectedBytes = out.toByteArray(); + byte[] buffer = new byte[expectedBytes.length + 1]; + + List slices = chunks.stream().map(Slices::wrappedBuffer).collect(toImmutableList()); + assertEquals(input(slices).readAllBytes(), expectedBytes); + assertEquals(input(slices).getSlice(expectedBytes.length).getBytes(), expectedBytes); + assertEquals(readAll(input(slices)), expectedBytes); + + assertEquals(input(slices).readNBytes(expectedBytes.length), expectedBytes); + + assertEquals(input(slices).read(buffer, 0, 0), 0); + assertEquals(input(slices).getSlice(0), EMPTY_SLICE); + + if (expectedBytes.length > 0) { + // read from one chunk only + assertEquals(input(slices).read(buffer, 0, 1), 1); + assertEquals(buffer[0], expectedBytes[0]); + } + + // rad more than total length + ChunkedInputStream input = input(slices); + int bytesRead = ByteStreams.read(input, buffer, 0, buffer.length); + assertEquals(bytesRead, expectedBytes.length > 0 ? expectedBytes.length : -1); + + // read after input is done returns -1 + assertEquals(input.read(), -1); + // getSlice(0) after input is done returns empty slice + assertEquals(input.getSlice(0), EMPTY_SLICE); + assertThatThrownBy(() -> input.getSlice(1)).isInstanceOf(IllegalArgumentException.class); + + assertEquals(input.read(buffer, 0, 1), -1); + + // verify available + ChunkedInputStream availableInput = input(slices); + assertEquals(availableInput.available(), chunks.get(0).length); + availableInput.skipNBytes(chunks.get(0).length); + assertEquals(availableInput.available(), 0); + if (chunks.size() > 1) { + availableInput.read(); + assertEquals(availableInput.available(), chunks.get(1).length - 1); + availableInput.skipNBytes(chunks.get(1).length - 1); + assertEquals(availableInput.available(), 0); + } + } + + @Test(dataProvider = "chunks") + public void testClose(List chunks) + throws IOException + { + List slices = chunks.stream().map(Slices::wrappedBuffer).collect(toImmutableList()); + + // close fresh input, not read + List chunksReaders = slices.stream().map(TestingChunkReader::new).collect(toImmutableList()); + ChunkedInputStream input = new ChunkedInputStream(chunksReaders); + input.close(); + for (TestingChunkReader chunksReader : chunksReaders) { + assertTrue(chunksReader.isFreed()); + } + + // close partially read input + chunksReaders = slices.stream().map(TestingChunkReader::new).collect(toImmutableList()); + input = new ChunkedInputStream(chunksReaders); + input.readNBytes(chunks.get(0).length); + input.close(); + for (TestingChunkReader chunksReader : chunksReaders) { + assertTrue(chunksReader.isFreed()); + } + + // close fully read input + chunksReaders = slices.stream().map(TestingChunkReader::new).collect(toImmutableList()); + input = new ChunkedInputStream(chunksReaders); + input.readNBytes(chunks.stream().mapToInt(chunk -> chunk.length).sum()); + input.close(); + for (TestingChunkReader chunksReader : chunksReaders) { + assertTrue(chunksReader.isFreed()); + } + } + + @DataProvider + public Object[][] chunks() + { + return new Object[][] { + {ImmutableList.of(new byte[] {1, 2, 3})}, + {ImmutableList.of(new byte[] {1, 2, 3}, new byte[] {1, 2})}, + {ImmutableList.of(new byte[] {1, 2, 3}, new byte[] {1}, new byte[] {1, 2})}, + }; + } + + private static byte[] readAll(ChunkedInputStream input) + throws IOException + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + int read; + while ((read = input.read()) != -1) { + out.write(read); + } + return out.toByteArray(); + } + + private static ChunkedInputStream input(List slices) + { + return new ChunkedInputStream(slices.stream().map(TestingChunkReader::new).collect(toImmutableList())); + } +} diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReader.java index 530749eb3dd2..fe8fabfe38cf 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReader.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReader.java @@ -17,7 +17,7 @@ import io.airlift.slice.Slices; import io.trino.parquet.DataPage; import io.trino.parquet.DataPageV2; -import io.trino.parquet.DictionaryPage; +import io.trino.parquet.Page; import io.trino.parquet.PrimitiveField; import io.trino.parquet.reader.decoders.ValueDecoders; import io.trino.parquet.reader.flat.FlatColumnReader; @@ -38,7 +38,6 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.LinkedList; import java.util.List; import java.util.Optional; import java.util.OptionalLong; @@ -350,16 +349,14 @@ private static PageReader getPageReader(List testingPages, boolean else { encoder = new PlainValuesWriter(1000, 1000, HeapByteBufferAllocator.getInstance()); } - LinkedList inputPages = new LinkedList<>(createDataPages(testingPages, encoder)); - DictionaryPage dictionaryPage = null; + List inputPages = createDataPages(testingPages, encoder); if (dictionaryEncoded) { - dictionaryPage = toTrinoDictionaryPage(encoder.toDictPageAndClose()); + inputPages = ImmutableList.builder().add(toTrinoDictionaryPage(encoder.toDictPageAndClose())).addAll(inputPages).build(); } return new PageReader( UNCOMPRESSED, - inputPages, - dictionaryPage, - pagesRowCount(testingPages), + inputPages.iterator(), + dictionaryEncoded, false); } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java index ac6489f6a1d8..053bf128feff 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java @@ -33,7 +33,6 @@ import java.io.IOException; import java.time.LocalDateTime; -import java.util.LinkedList; import java.util.List; import java.util.Optional; import java.util.OptionalLong; @@ -108,7 +107,7 @@ public void testVariousTimestamps(TimestampType type, BiFunction(List.of(dataPage)), null, dataPage.getValueCount(), false), + new PageReader(UNCOMPRESSED, List.of(dataPage).iterator(), false, false), Optional.empty()); reader.prepareNextRead(valueCount); Block block = reader.readPrimitive().getBlock(); diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestPageReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestPageReader.java new file mode 100644 index 000000000000..02097aa26ba6 --- /dev/null +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestPageReader.java @@ -0,0 +1,431 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.airlift.compress.snappy.SnappyCompressor; +import io.airlift.compress.snappy.SnappyRawCompressor; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.parquet.DataPage; +import io.trino.parquet.DataPageV1; +import io.trino.parquet.DataPageV2; +import io.trino.parquet.DictionaryPage; +import io.trino.parquet.ParquetCorruptionException; +import io.trino.parquet.ParquetEncoding; +import io.trino.parquet.ParquetTypeUtils; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.EncodingStats; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.Encoding; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.PageType; +import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.parquet.reader.TestPageReader.DataPageType.V1; +import static io.trino.parquet.reader.TestPageReader.DataPageType.V2; +import static java.util.Objects.requireNonNull; +import static org.apache.parquet.column.Encoding.PLAIN; +import static org.apache.parquet.column.Encoding.RLE_DICTIONARY; +import static org.apache.parquet.format.PageType.DATA_PAGE_V2; +import static org.apache.parquet.format.PageType.DICTIONARY_PAGE; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +public class TestPageReader +{ + private static final byte[] DATA_PAGE = new byte[] {1, 2, 3}; + + @Test(dataProvider = "pageParameters") + public void singlePage(CompressionCodecName compressionCodec, DataPageType dataPageType) + throws Exception + { + int valueCount = 10; + byte[] compressedDataPage = dataPageType.compress(compressionCodec, DATA_PAGE); + + PageHeader pageHeader = new PageHeader(dataPageType.pageType(), DATA_PAGE.length, compressedDataPage.length); + dataPageType.setDataPageHeader(pageHeader, valueCount); + + ByteArrayOutputStream out = new ByteArrayOutputStream(20); + Util.writePageHeader(pageHeader, out); + int headerSize = out.size(); + out.write(compressedDataPage); + byte[] bytes = out.toByteArray(); + + // single slice + assertSinglePage(compressionCodec, valueCount, pageHeader, compressedDataPage, ImmutableList.of( + Slices.wrappedBuffer(bytes))); + + // pageHeader split across two slices + assertSinglePage(compressionCodec, valueCount, pageHeader, compressedDataPage, ImmutableList.of( + Slices.wrappedBuffer(Arrays.copyOf(bytes, 8)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, 8, bytes.length)))); + + // pageHeader split across many slices + int secondHeaderChunkOffset = 5; + int thirdHeaderChunkOffset = 11; + assertSinglePage(compressionCodec, valueCount, pageHeader, compressedDataPage, ImmutableList.of( + Slices.wrappedBuffer(Arrays.copyOf(bytes, secondHeaderChunkOffset)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, secondHeaderChunkOffset, thirdHeaderChunkOffset)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, thirdHeaderChunkOffset, bytes.length)))); + + // page data split across two slices + assertSinglePage(compressionCodec, valueCount, pageHeader, compressedDataPage, ImmutableList.of( + Slices.wrappedBuffer(Arrays.copyOf(bytes, headerSize + 1)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, headerSize + 1, bytes.length)))); + + // page data split across many slices + assertSinglePage(compressionCodec, valueCount, pageHeader, compressedDataPage, ImmutableList.of( + Slices.wrappedBuffer(Arrays.copyOf(bytes, headerSize + 1)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, headerSize + 1, headerSize + 2)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, headerSize + 2, bytes.length)))); + } + + @Test(dataProvider = "pageParameters") + public void manyPages(CompressionCodecName compressionCodec, DataPageType dataPageType) + throws Exception + { + int totalValueCount = 30; + byte[] compressedDataPage = dataPageType.compress(compressionCodec, DATA_PAGE); + + PageHeader pageHeader = new PageHeader(dataPageType.pageType(), DATA_PAGE.length, compressedDataPage.length); + dataPageType.setDataPageHeader(pageHeader, 10); + + ByteArrayOutputStream out = new ByteArrayOutputStream(100); + Util.writePageHeader(pageHeader, out); + int headerSize = out.size(); + out.write(compressedDataPage); + // second page + Util.writePageHeader(pageHeader, out); + out.write(compressedDataPage); + // third page + Util.writePageHeader(pageHeader, out); + out.write(compressedDataPage); + byte[] bytes = out.toByteArray(); + + // single slice + assertPages(compressionCodec, totalValueCount, 3, pageHeader, compressedDataPage, ImmutableList.of( + Slices.wrappedBuffer(bytes))); + + // each page in its own split + int pageSize = headerSize + compressedDataPage.length; + assertPages(compressionCodec, totalValueCount, 3, pageHeader, compressedDataPage, ImmutableList.of( + Slices.wrappedBuffer(Arrays.copyOf(bytes, pageSize)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, pageSize, pageSize * 2)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, pageSize * 2, bytes.length)))); + + // page start in the middle of the slice + assertPages(compressionCodec, totalValueCount, 3, pageHeader, compressedDataPage, ImmutableList.of( + Slices.wrappedBuffer(Arrays.copyOf(bytes, pageSize - 2)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, pageSize - 2, pageSize * 2)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, pageSize * 2, bytes.length)))); + } + + @Test(dataProvider = "pageParameters") + public void dictionaryPage(CompressionCodecName compressionCodec, DataPageType dataPageType) + throws Exception + { + byte[] dictionaryPage = {4}; + byte[] compressedDictionaryPage = TestPageReader.compress(compressionCodec, dictionaryPage, 0, dictionaryPage.length); + PageHeader dictionaryPageHeader = new PageHeader(DICTIONARY_PAGE, dictionaryPage.length, compressedDictionaryPage.length); + dictionaryPageHeader.setDictionary_page_header(new DictionaryPageHeader(3, Encoding.PLAIN)); + int totalValueCount = 30; + byte[] compressedDataPage = dataPageType.compress(compressionCodec, DATA_PAGE); + + PageHeader pageHeader = new PageHeader(dataPageType.pageType(), DATA_PAGE.length, compressedDataPage.length); + dataPageType.setDataPageHeader(pageHeader, 10); + + ByteArrayOutputStream out = new ByteArrayOutputStream(100); + Util.writePageHeader(dictionaryPageHeader, out); + int dictionaryHeaderSize = out.size(); + out.write(compressedDictionaryPage); + int dictionaryPageSize = out.size(); + + Util.writePageHeader(pageHeader, out); + out.write(compressedDataPage); + // second page + Util.writePageHeader(pageHeader, out); + out.write(compressedDataPage); + // third page + Util.writePageHeader(pageHeader, out); + out.write(compressedDataPage); + byte[] bytes = out.toByteArray(); + + PageReader pageReader = createPageReader(totalValueCount, compressionCodec, true, ImmutableList.of(Slices.wrappedBuffer(bytes))); + DictionaryPage uncompressedDictionaryPage = pageReader.readDictionaryPage(); + assertThat(uncompressedDictionaryPage.getDictionarySize()).isEqualTo(dictionaryPageHeader.getDictionary_page_header().getNum_values()); + assertEncodingEquals(uncompressedDictionaryPage.getEncoding(), dictionaryPageHeader.getDictionary_page_header().getEncoding()); + assertThat(uncompressedDictionaryPage.getSlice()).isEqualTo(Slices.wrappedBuffer(dictionaryPage)); + + // single slice + assertPages(compressionCodec, totalValueCount, 3, pageHeader, compressedDataPage, true, ImmutableList.of(Slices.wrappedBuffer(bytes))); + + // only dictionary + assertPages(compressionCodec, 0, 0, pageHeader, compressedDataPage, true, ImmutableList.of( + Slices.wrappedBuffer(Arrays.copyOf(bytes, dictionaryPageSize)))); + + // multiple slices dictionary + assertPages(compressionCodec, totalValueCount, 3, pageHeader, compressedDataPage, true, ImmutableList.of( + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, 0, dictionaryHeaderSize - 1)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, dictionaryHeaderSize - 1, dictionaryPageSize - 1)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, dictionaryPageSize - 1, dictionaryPageSize + 1)), + Slices.wrappedBuffer(Arrays.copyOfRange(bytes, dictionaryPageSize + 1, bytes.length)))); + } + + @Test + public void dictionaryPageNotFirst() + throws Exception + { + byte[] dictionaryPage = {4}; + CompressionCodecName compressionCodec = UNCOMPRESSED; + byte[] compressedDictionaryPage = TestPageReader.compress(compressionCodec, dictionaryPage, 0, dictionaryPage.length); + PageHeader dictionaryPageHeader = new PageHeader(DICTIONARY_PAGE, dictionaryPage.length, compressedDictionaryPage.length); + dictionaryPageHeader.setDictionary_page_header(new DictionaryPageHeader(3, Encoding.PLAIN)); + DataPageType dataPageType = V2; + byte[] compressedDataPage = DATA_PAGE; + + PageHeader pageHeader = new PageHeader(dataPageType.pageType(), DATA_PAGE.length, compressedDataPage.length); + int valueCount = 10; + dataPageType.setDataPageHeader(pageHeader, valueCount); + + ByteArrayOutputStream out = new ByteArrayOutputStream(100); + Util.writePageHeader(pageHeader, out); + out.write(compressedDataPage); + + Util.writePageHeader(dictionaryPageHeader, out); + out.write(compressedDictionaryPage); + // write another page so that we have something to read after the first + Util.writePageHeader(pageHeader, out); + out.write(compressedDataPage); + byte[] bytes = out.toByteArray(); + + int totalValueCount = valueCount * 2; + + // metadata says there is a dictionary but it's not the first page + assertThatThrownBy(() -> createPageReader(totalValueCount, compressionCodec, true, ImmutableList.of(Slices.wrappedBuffer(bytes))).readDictionaryPage()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("DictionaryPage has to be the first page in the column chunk"); + + // metadata says there is no dictionary, but it's there as second page + PageReader pageReader = createPageReader(totalValueCount, compressionCodec, false, ImmutableList.of(Slices.wrappedBuffer(bytes))); + assertTrue(pageReader.hasNext()); + pageReader.skipNextPage(); + assertThatThrownBy(pageReader::readPage).isInstanceOf(RuntimeException.class).hasCauseInstanceOf(ParquetCorruptionException.class); + } + + private static void assertSinglePage(CompressionCodecName compressionCodec, int valueCount, PageHeader pageHeader, byte[] compressedDataPage, List slices) + throws IOException + { + assertPages(compressionCodec, valueCount, 1, pageHeader, compressedDataPage, slices); + } + + private static void assertPages(CompressionCodecName compressionCodec, int valueCount, int pageCount, PageHeader pageHeader, byte[] compressedDataPage, List slices) + throws IOException + { + assertPages(compressionCodec, valueCount, pageCount, pageHeader, compressedDataPage, false, slices); + } + + private static void assertPages( + CompressionCodecName compressionCodec, + int valueCount, + int pageCount, + PageHeader pageHeader, + byte[] compressedDataPage, + boolean hasDictionary, + List slices) + throws IOException + { + PageReader pageReader = createPageReader(valueCount, compressionCodec, hasDictionary, slices); + DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); + assertEquals(dictionaryPage != null, hasDictionary); + + for (int i = 0; i < pageCount; i++) { + assertTrue(pageReader.hasNext()); + DataPage decompressedPage = pageReader.readPage(); + assertNotNull(decompressedPage); + assertDataPageEquals(pageHeader, DATA_PAGE, compressedDataPage, decompressedPage); + } + assertFalse(pageReader.hasNext()); + assertNull(pageReader.readPage()); + } + + @DataProvider + public Object[][] pageParameters() + { + return new Object[][] {{UNCOMPRESSED, V1}, {SNAPPY, V1}, {UNCOMPRESSED, V2}, {SNAPPY, V2}}; + } + + public enum DataPageType + { + V1(PageType.DATA_PAGE) { + @Override + public void setDataPageHeader(PageHeader pageHeader, int valueCount) + { + pageHeader.setData_page_header(new DataPageHeader(valueCount, Encoding.PLAIN, Encoding.PLAIN, Encoding.PLAIN)); + } + + @Override + public byte[] compress(CompressionCodecName compressionCodec, byte[] dataPage) + { + return TestPageReader.compress(compressionCodec, dataPage, 0, dataPage.length); + } + }, + V2(DATA_PAGE_V2) { + @Override + public void setDataPageHeader(PageHeader pageHeader, int valueCount) + { + pageHeader.setData_page_header_v2(new DataPageHeaderV2(valueCount, 0, valueCount, Encoding.PLAIN, 1, 1)); + } + + @Override + public byte[] compress(CompressionCodecName compressionCodec, byte[] dataPage) + { + // compress only the date, copy definition and repetition levels uncompressed + byte[] compressedData = TestPageReader.compress(compressionCodec, dataPage, 2, dataPage.length - 2); + Slice slice = Slices.allocate(2 + compressedData.length); + slice.setBytes(0, dataPage, 0, 2); + slice.setBytes(2, compressedData); + return slice.byteArray(); + } + }; + + private final PageType pageType; + + DataPageType(PageType pageType) + { + this.pageType = requireNonNull(pageType, "pageType is null"); + } + + public abstract void setDataPageHeader(PageHeader pageHeader, int valueCount); + + public PageType pageType() + { + return pageType; + } + + public abstract byte[] compress(CompressionCodecName compressionCodec, byte[] dataPage); + } + + private static byte[] compress(CompressionCodecName compressionCodec, byte[] bytes, int offset, int length) + { + if (compressionCodec == UNCOMPRESSED) { + return Arrays.copyOfRange(bytes, offset, offset + length); + } + if (compressionCodec == SNAPPY) { + byte[] out = new byte[SnappyRawCompressor.maxCompressedLength(length)]; + int compressedSize = new SnappyCompressor().compress(bytes, offset, length, out, 0, out.length); + return Arrays.copyOf(out, compressedSize); + } + throw new IllegalArgumentException("unsupported compression code " + compressionCodec); + } + + private static PageReader createPageReader(int valueCount, CompressionCodecName compressionCodec, boolean hasDictionary, List slices) + throws IOException + { + EncodingStats.Builder encodingStats = new EncodingStats.Builder(); + if (hasDictionary) { + encodingStats.addDictEncoding(PLAIN); + encodingStats.addDataEncoding(RLE_DICTIONARY); + } + ColumnChunkMetaData columnChunkMetaData = ColumnChunkMetaData.get( + ColumnPath.get(""), + INT32, + compressionCodec, + encodingStats.build(), + ImmutableSet.of(), + Statistics.createStats(Types.optional(INT32).named("fake_type")), + 0, + 0, + valueCount, + 0, + 0); + return PageReader.createPageReader( + new ChunkedInputStream(slices.stream().map(TestingChunkReader::new).collect(toImmutableList())), + columnChunkMetaData, + new ColumnDescriptor(new String[] {}, new PrimitiveType(REQUIRED, INT32, ""), 0, 0), + null, + Optional.empty()); + } + + private static void assertDataPageEquals(PageHeader pageHeader, byte[] dataPage, byte[] compressedDataPage, DataPage decompressedPage) + { + assertThat(decompressedPage.getUncompressedSize()).isEqualTo(pageHeader.getUncompressed_page_size()); + assertThat(pageHeader.getCompressed_page_size()).isEqualTo(compressedDataPage.length); + assertThat(decompressedPage.getFirstRowIndex()).isEmpty(); + if (PageType.DATA_PAGE.equals(pageHeader.getType())) { + assertDataPageV1(dataPage, pageHeader, decompressedPage); + } + if (DATA_PAGE_V2.equals(pageHeader.getType())) { + assertDataPageV2(dataPage, pageHeader, decompressedPage); + } + } + + private static void assertDataPageV1(byte[] dataPage, PageHeader pageHeader, DataPage decompressedPage) + { + DataPageHeader dataPageHeader = pageHeader.getData_page_header(); + assertThat(decompressedPage.getValueCount()).isEqualTo(dataPageHeader.getNum_values()); + assertThat(decompressedPage).isInstanceOf(DataPageV1.class); + DataPageV1 decompressedV1Page = (DataPageV1) decompressedPage; + assertThat(decompressedV1Page.getSlice()).isEqualTo(Slices.wrappedBuffer(dataPage)); + assertEncodingEquals(decompressedV1Page.getValueEncoding(), dataPageHeader.getEncoding()); + assertEncodingEquals(decompressedV1Page.getDefinitionLevelEncoding(), dataPageHeader.getDefinition_level_encoding()); + assertEncodingEquals(decompressedV1Page.getRepetitionLevelEncoding(), dataPageHeader.getRepetition_level_encoding()); + } + + private static void assertDataPageV2(byte[] dataPage, PageHeader pageHeader, DataPage decompressedPage) + { + DataPageHeaderV2 dataPageHeader = pageHeader.getData_page_header_v2(); + assertThat(decompressedPage.getValueCount()).isEqualTo(dataPageHeader.getNum_values()); + assertThat(decompressedPage).isInstanceOf(DataPageV2.class); + DataPageV2 decompressedV2Page = (DataPageV2) decompressedPage; + int dataOffset = dataPageHeader.getDefinition_levels_byte_length() + dataPageHeader.getRepetition_levels_byte_length(); + assertThat(decompressedV2Page.getSlice()).isEqualTo(Slices.wrappedBuffer(dataPage).slice(dataOffset, dataPage.length - dataOffset)); + assertEncodingEquals(decompressedV2Page.getDataEncoding(), dataPageHeader.getEncoding()); + + assertThat(decompressedV2Page.getRepetitionLevels()).isEqualTo(Slices.wrappedBuffer(dataPage).slice(0, 1)); + assertThat(decompressedV2Page.getDefinitionLevels()).isEqualTo(Slices.wrappedBuffer(dataPage).slice(1, 1)); + } + + private static void assertEncodingEquals(ParquetEncoding parquetEncoding, Encoding encoding) + { + assertThat(parquetEncoding).isEqualTo(ParquetTypeUtils.getParquetEncoding(org.apache.parquet.column.Encoding.valueOf(encoding.name()))); + } +} diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java new file mode 100644 index 000000000000..653fc1273ffe --- /dev/null +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java @@ -0,0 +1,109 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.ListMultimap; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.airlift.units.DataSize; +import io.trino.memory.context.AggregatedMemoryContext; +import io.trino.parquet.ChunkReader; +import io.trino.parquet.DiskRange; +import io.trino.parquet.ParquetReaderOptions; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.stream.IntStream; + +import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; + +public class TestParquetDataSource +{ + @Test(dataProvider = "testPlanReadOrderingProvider") + public void testPlanReadOrdering(DataSize maxBufferSize) + throws IOException + { + Slice testingInput = Slices.wrappedIntArray(IntStream.range(0, 1000).toArray()); + TestingParquetDataSource dataSource = new TestingParquetDataSource( + testingInput, + new ParquetReaderOptions().withMaxBufferSize(maxBufferSize)); + + ListMultimap chunkReaders = dataSource.planRead( + ImmutableListMultimap.builder() + .putAll("test", new DiskRange(0, 200), new DiskRange(400, 100), new DiskRange(700, 200)) + .build(), + newSimpleAggregatedMemoryContext()); + assertThat(chunkReaders.get("test")) + .map(ChunkReader::read) + .isEqualTo(ImmutableList.of( + testingInput.slice(0, 200), + testingInput.slice(400, 100), + testingInput.slice(700, 200))); + } + + @DataProvider + public Object[][] testPlanReadOrderingProvider() + { + return new Object[][] { + {DataSize.ofBytes(200)}, // Mix of large and small ranges + {DataSize.ofBytes(100000000)}, // All small ranges + }; + } + + @Test + public void testMemoryAccounting() + throws IOException + { + Slice testingInput = Slices.wrappedIntArray(IntStream.range(0, 1000).toArray()); + TestingParquetDataSource dataSource = new TestingParquetDataSource( + testingInput, + new ParquetReaderOptions().withMaxBufferSize(DataSize.ofBytes(500))); + AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext(); + ListMultimap chunkReaders = dataSource.planRead(ImmutableListMultimap.builder() + .put("1", new DiskRange(0, 200)) + .put("2", new DiskRange(400, 100)) + .put("3", new DiskRange(700, 200)) + .build(), + memoryContext); + + ChunkReader firstReader = Iterables.getOnlyElement(chunkReaders.get("1")); + ChunkReader secondReader = Iterables.getOnlyElement(chunkReaders.get("2")); + ChunkReader thirdReader = Iterables.getOnlyElement(chunkReaders.get("3")); + assertEquals(memoryContext.getBytes(), 0); + firstReader.read(); + // first and second range are merged + assertEquals(memoryContext.getBytes(), 500); + firstReader.free(); + // since the second reader is not freed, the memory is still retained + assertEquals(memoryContext.getBytes(), 500); + thirdReader.read(); + // third reader is standalone so only retains its size + assertEquals(memoryContext.getBytes(), 700); + thirdReader.free(); + // third reader is standalone, free releases the memory + assertEquals(memoryContext.getBytes(), 500); + secondReader.read(); + // second reader is merged with the first, read only accesses already cached data + assertEquals(memoryContext.getBytes(), 500); + secondReader.free(); + // both readers using merged reader are freed, all memory is released + assertEquals(memoryContext.getBytes(), 0); + } +} diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestingChunkReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestingChunkReader.java new file mode 100644 index 000000000000..464860716f79 --- /dev/null +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestingChunkReader.java @@ -0,0 +1,56 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader; + +import io.airlift.slice.Slice; +import io.trino.parquet.ChunkReader; + +import javax.annotation.Nullable; + +import static java.util.Objects.requireNonNull; + +public class TestingChunkReader + implements ChunkReader +{ + @Nullable + private Slice slice; + + public TestingChunkReader(Slice slice) + { + this.slice = requireNonNull(slice, "slice is null"); + } + + @Override + public Slice read() + { + return requireNonNull(slice, "slice is null"); + } + + @Override + public long getDiskOffset() + { + return 0; + } + + @Override + public void free() + { + slice = null; + } + + public boolean isFreed() + { + return slice == null; + } +} diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestingParquetDataSource.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestingParquetDataSource.java new file mode 100644 index 000000000000..1ae1fabdc322 --- /dev/null +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestingParquetDataSource.java @@ -0,0 +1,44 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader; + +import io.airlift.slice.Slice; +import io.trino.parquet.AbstractParquetDataSource; +import io.trino.parquet.ParquetDataSourceId; +import io.trino.parquet.ParquetReaderOptions; + +import java.io.IOException; +import java.util.UUID; + +import static java.lang.Math.toIntExact; + +public class TestingParquetDataSource + extends AbstractParquetDataSource +{ + private final Slice input; + + public TestingParquetDataSource(Slice slice, ParquetReaderOptions options) + throws IOException + { + super(new ParquetDataSourceId(UUID.randomUUID().toString()), slice.length(), options); + this.input = slice; + } + + @Override + protected void readInternal(long position, byte[] buffer, int bufferOffset, int bufferLength) + throws IOException + { + input.getBytes(toIntExact(position), buffer, bufferOffset, bufferLength); + } +} diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/flat/TestFlatColumnReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/flat/TestFlatColumnReader.java index 1f9d3b4ce424..d75720005e40 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/flat/TestFlatColumnReader.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/flat/TestFlatColumnReader.java @@ -13,12 +13,14 @@ */ package io.trino.parquet.reader.flat; +import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.trino.parquet.DataPage; import io.trino.parquet.DataPageV1; import io.trino.parquet.DataPageV2; import io.trino.parquet.DictionaryPage; +import io.trino.parquet.Page; import io.trino.parquet.ParquetEncoding; import io.trino.parquet.PrimitiveField; import io.trino.parquet.reader.ColumnReader; @@ -549,7 +551,7 @@ private static PageReader getSimplePageReaderMock(ParquetEncoding encoding) encoding, encoding, PLAIN)); - return new PageReader(UNCOMPRESSED, pages, null, 1, false); + return new PageReader(UNCOMPRESSED, pages.iterator(), false, false); } private static PageReader getNullOnlyPageReaderMock() @@ -567,7 +569,7 @@ private static PageReader getNullOnlyPageReaderMock() RLE, RLE, PLAIN)); - return new PageReader(UNCOMPRESSED, pages, null, 1, false); + return new PageReader(UNCOMPRESSED, pages.iterator(), false, false); } private static PageReader getPageReaderMock(LinkedList dataPages, @Nullable DictionaryPage dictionaryPage) @@ -577,13 +579,17 @@ private static PageReader getPageReaderMock(LinkedList dataPages, @Nul private static PageReader getPageReaderMock(LinkedList dataPages, @Nullable DictionaryPage dictionaryPage, boolean hasNoNulls) { + if (dictionaryPage != null) { + return new PageReader( + UNCOMPRESSED, + ImmutableList.builder().add(dictionaryPage).addAll(dataPages).build().iterator(), + true, + hasNoNulls); + } return new PageReader( UNCOMPRESSED, - dataPages, - dictionaryPage, - dataPages.stream() - .mapToInt(DataPage::getValueCount) - .sum(), + dataPages.iterator(), + false, hasNoNulls); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetReaderConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetReaderConfig.java index 817f206147d9..b91501a577e9 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetReaderConfig.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetReaderConfig.java @@ -18,6 +18,7 @@ import io.airlift.configuration.DefunctConfig; import io.airlift.configuration.LegacyConfig; import io.airlift.units.DataSize; +import io.airlift.units.MinDataSize; import io.trino.parquet.ParquetReaderOptions; import javax.validation.constraints.NotNull; @@ -73,6 +74,7 @@ public ParquetReaderConfig setMaxMergeDistance(DataSize distance) } @NotNull + @MinDataSize("1MB") public DataSize getMaxBufferSize() { return options.getMaxBufferSize(); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestParquetPageSkipping.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestParquetPageSkipping.java index 7868d8771bc3..255a89365d78 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestParquetPageSkipping.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestParquetPageSkipping.java @@ -27,8 +27,7 @@ protected QueryRunner createQueryRunner() .setHiveProperties( ImmutableMap.of( "parquet.use-column-index", "true", - // Small max-buffer-size allows testing mix of small and large ranges in HdfsParquetDataSource#planRead - "parquet.max-buffer-size", "400B", + "parquet.max-buffer-size", "1MB", "parquet.optimized-reader.enabled", "false")) .build(); } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestParquetPageSkippingWithOptimizedReader.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestParquetPageSkippingWithOptimizedReader.java index 32d033dd8d4b..f68b673d9137 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestParquetPageSkippingWithOptimizedReader.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestParquetPageSkippingWithOptimizedReader.java @@ -27,8 +27,7 @@ protected QueryRunner createQueryRunner() .setHiveProperties( ImmutableMap.of( "parquet.use-column-index", "true", - // Small max-buffer-size allows testing mix of small and large ranges in HdfsParquetDataSource#planRead - "parquet.max-buffer-size", "400B", + "parquet.max-buffer-size", "1MB", "parquet.optimized-reader.enabled", "true")) .build(); } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestHdfsParquetDataSource.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestHdfsParquetDataSource.java deleted file mode 100644 index 802becf294f4..000000000000 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestHdfsParquetDataSource.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.hive.parquet; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableListMultimap; -import com.google.common.collect.ListMultimap; -import io.airlift.slice.Slice; -import io.airlift.slice.Slices; -import io.airlift.units.DataSize; -import io.trino.filesystem.TrinoFileSystem; -import io.trino.parquet.ChunkReader; -import io.trino.parquet.DiskRange; -import io.trino.parquet.ParquetReaderOptions; -import io.trino.plugin.hive.FileFormatDataSourceStats; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.UUID; -import java.util.stream.IntStream; - -import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; -import static io.trino.plugin.hive.HiveTestUtils.SESSION; -import static org.assertj.core.api.Assertions.assertThat; - -public class TestHdfsParquetDataSource -{ - @Test(dataProvider = "testPlanReadOrderingProvider") - public void testPlanReadOrdering(DataSize maxBufferSize) - throws IOException - { - Slice testingInput = Slices.wrappedIntArray(IntStream.range(0, 1000).toArray()); - String path = "/tmp/" + UUID.randomUUID(); - TrinoFileSystem trinoFileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION); - try (OutputStream outputStream = trinoFileSystem.newOutputFile(path).create(newSimpleAggregatedMemoryContext())) { - outputStream.write(testingInput.getBytes()); - } - TrinoParquetDataSource dataSource = new TrinoParquetDataSource( - trinoFileSystem.newInputFile(path), - new ParquetReaderOptions().withMaxBufferSize(maxBufferSize), - new FileFormatDataSourceStats()); - - ListMultimap chunkReaders = dataSource.planRead(ImmutableListMultimap.builder() - .putAll("test", new DiskRange(0, 300), new DiskRange(400, 100), new DiskRange(700, 200)) - .build()); - assertThat(chunkReaders.get("test")) - .map(ChunkReader::read) - .isEqualTo(ImmutableList.of( - testingInput.slice(0, 300), - testingInput.slice(400, 100), - testingInput.slice(700, 200))); - } - - @DataProvider - public Object[][] testPlanReadOrderingProvider() - { - return new Object[][] { - {DataSize.ofBytes(0)}, // All large ranges - {DataSize.ofBytes(200)}, // Mix of large and small ranges - {DataSize.ofBytes(100000000)}, // All small ranges - }; - } -}