Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.memory.context.LocalMemoryContext;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static java.lang.Math.toIntExact;
Expand Down Expand Up @@ -116,7 +120,7 @@ private void readFully(long position, byte[] buffer, int bufferOffset, int buffe
}

@Override
public final <K> ListMultimap<K, ChunkReader> planRead(ListMultimap<K, DiskRange> diskRanges)
public final <K> ListMultimap<K, ChunkReader> planRead(ListMultimap<K, DiskRange> diskRanges, AggregatedMemoryContext memoryContext)
{
requireNonNull(diskRanges, "diskRanges is null");

Expand All @@ -136,24 +140,44 @@ public final <K> ListMultimap<K, ChunkReader> planRead(ListMultimap<K, DiskRange
smallRangesBuilder.put(entry);
}
else {
largeRangesBuilder.put(entry);
largeRangesBuilder.putAll(entry.getKey(), splitLargeRange(entry.getValue()));
}
}
ListMultimap<K, DiskRange> smallRanges = smallRangesBuilder.build();
ListMultimap<K, DiskRange> largeRanges = largeRangesBuilder.build();

// read ranges
ImmutableListMultimap.Builder<K, ChunkReader> slices = ImmutableListMultimap.builder();
slices.putAll(readSmallDiskRanges(smallRanges));
slices.putAll(readLargeDiskRanges(largeRanges));
// Re-order ChunkReaders by their DiskRange offsets as ParquetColumnChunk expects
slices.putAll(readSmallDiskRanges(smallRanges, memoryContext));
slices.putAll(readLargeDiskRanges(largeRanges, memoryContext));
// Re-order ChunkReaders by their DiskRange offsets as ParquetColumnChunkIterator expects
// the input slices to be in the order that they're present in the file
slices.orderValuesBy(comparingLong(ChunkReader::getDiskOffset));

return slices.build();
}

private <K> ListMultimap<K, ChunkReader> readSmallDiskRanges(ListMultimap<K, DiskRange> diskRanges)
private List<DiskRange> splitLargeRange(DiskRange range)
{
int maxBufferSizeBytes = toIntExact(options.getMaxBufferSize().toBytes());
checkArgument(maxBufferSizeBytes > 0, "maxBufferSize must by larger than zero but is %s bytes", maxBufferSizeBytes);
ImmutableList.Builder<DiskRange> ranges = ImmutableList.builder();
long endOffset = range.getOffset() + range.getLength();
long offset = range.getOffset();
while (offset + maxBufferSizeBytes < endOffset) {
ranges.add(new DiskRange(offset, maxBufferSizeBytes));
offset += maxBufferSizeBytes;
}

long lengthLeft = endOffset - offset;
if (lengthLeft > 0) {
ranges.add(new DiskRange(offset, toIntExact(lengthLeft)));
}

return ranges.build();
}

private <K> ListMultimap<K, ChunkReader> readSmallDiskRanges(ListMultimap<K, DiskRange> diskRanges, AggregatedMemoryContext memoryContext)
{
if (diskRanges.isEmpty()) {
return ImmutableListMultimap.of();
Expand All @@ -163,7 +187,7 @@ private <K> ListMultimap<K, ChunkReader> readSmallDiskRanges(ListMultimap<K, Dis

ImmutableListMultimap.Builder<K, ChunkReader> slices = ImmutableListMultimap.builder();
for (DiskRange mergedRange : mergedRanges) {
ReferenceCountedReader mergedRangeLoader = new ReferenceCountedReader(mergedRange);
ReferenceCountedReader mergedRangeLoader = new ReferenceCountedReader(mergedRange, memoryContext);

for (Map.Entry<K, DiskRange> diskRangeEntry : diskRanges.entries()) {
DiskRange diskRange = diskRangeEntry.getValue();
Expand All @@ -183,7 +207,7 @@ public Slice read()
throws IOException
{
int offset = toIntExact(diskRange.getOffset() - mergedRange.getOffset());
return mergedRangeLoader.read().slice(offset, diskRange.getLength());
return mergedRangeLoader.read().slice(offset, toIntExact(diskRange.getLength()));
}

@Override
Expand All @@ -203,15 +227,15 @@ public void free()
return sliceStreams;
}

private <K> ListMultimap<K, ChunkReader> readLargeDiskRanges(ListMultimap<K, DiskRange> diskRanges)
private <K> ListMultimap<K, ChunkReader> readLargeDiskRanges(ListMultimap<K, DiskRange> diskRanges, AggregatedMemoryContext memoryContext)
{
if (diskRanges.isEmpty()) {
return ImmutableListMultimap.of();
}

ImmutableListMultimap.Builder<K, ChunkReader> slices = ImmutableListMultimap.builder();
for (Map.Entry<K, DiskRange> entry : diskRanges.entries()) {
slices.put(entry.getKey(), new ReferenceCountedReader(entry.getValue()));
slices.put(entry.getKey(), new ReferenceCountedReader(entry.getValue(), memoryContext));
}
return slices.build();
}
Expand Down Expand Up @@ -254,13 +278,18 @@ private static List<DiskRange> mergeAdjacentDiskRanges(Collection<DiskRange> dis
private class ReferenceCountedReader
implements ChunkReader
{
// See jdk.internal.util.ArraysSupport.SOFT_MAX_ARRAY_LENGTH for an explanation
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
private final DiskRange range;
private final LocalMemoryContext readerMemoryUsage;
private Slice data;
private int referenceCount = 1;

public ReferenceCountedReader(DiskRange range)
public ReferenceCountedReader(DiskRange range, AggregatedMemoryContext memoryContext)
{
this.range = range;
checkArgument(range.getLength() <= MAX_ARRAY_SIZE, "Cannot read range bigger than %s but got %s", MAX_ARRAY_SIZE, range);
this.readerMemoryUsage = memoryContext.newLocalMemoryContext(ReferenceCountedReader.class.getSimpleName());
}

public void addReference()
Expand All @@ -282,9 +311,10 @@ public Slice read()
checkState(referenceCount > 0, "Chunk reader is already closed");

if (data == null) {
byte[] buffer = new byte[range.getLength()];
byte[] buffer = new byte[toIntExact(range.getLength())];
readFully(range.getOffset(), buffer, 0, buffer.length);
data = Slices.wrappedBuffer(buffer);
readerMemoryUsage.setBytes(data.length());
}

return data;
Expand All @@ -298,7 +328,17 @@ public void free()
referenceCount--;
if (referenceCount == 0) {
data = null;
readerMemoryUsage.setBytes(0);
}
}

@Override
public String toString()
{
return toStringHelper(this)
.add("range", range)
.add("referenceCount", referenceCount)
.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

// same as io.trino.orc.DiskRange
public final class DiskRange
{
private final long offset;
private final int length;
private final long length;

public DiskRange(long offset, int length)
public DiskRange(long offset, long length)
{
checkArgument(offset >= 0, "offset is negative");
checkArgument(length > 0, "length must be at least 1");
Expand All @@ -40,7 +39,7 @@ public long getOffset()
return offset;
}

public int getLength()
public long getLength()
{
return length;
}
Expand All @@ -65,7 +64,7 @@ public DiskRange span(DiskRange otherDiskRange)
requireNonNull(otherDiskRange, "otherDiskRange is null");
long start = Math.min(this.offset, otherDiskRange.getOffset());
long end = Math.max(getEnd(), otherDiskRange.getEnd());
return new DiskRange(start, toIntExact(end - start));
return new DiskRange(start, end - start);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ListMultimap;
import io.airlift.slice.Slice;
import io.trino.memory.context.AggregatedMemoryContext;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -36,7 +37,7 @@ Slice readTail(int length)
Slice readFully(long position, int length)
throws IOException;

<K> ListMultimap<K, ChunkReader> planRead(ListMultimap<K, DiskRange> diskRanges);
<K> ListMultimap<K, ChunkReader> planRead(ListMultimap<K, DiskRange> diskRanges, AggregatedMemoryContext memoryContext);

@Override
default void close()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.parquet.reader;

import io.airlift.slice.BasicSliceInput;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.parquet.ChunkReader;
import io.trino.parquet.ParquetReaderOptions;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Iterator;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkPositionIndexes;
import static com.google.common.io.ByteStreams.readFully;
import static java.util.Objects.requireNonNull;

/**
* A single continuous {@link InputStream} over multiple {@link Slice}s read on demand using given collection of {@link ChunkReader}s.
* It is used to read parquet column chunk in limited (small) byte chunks (8MB by default, controlled by {@link ParquetReaderOptions#getMaxReadBlockSize()}).
* Column chunks consists of multiple pages.
* This abstraction is used because the page size is unknown until the page header is read
* and page header and page data can be split between two or more byte chunks.
*/
public final class ChunkedInputStream
extends InputStream
Comment thread
raunaqmorarka marked this conversation as resolved.
Outdated
{
private final Iterator<? extends ChunkReader> chunks;
private ChunkReader currentChunkReader;
private BasicSliceInput current;

public ChunkedInputStream(Collection<? extends ChunkReader> chunks)
{
requireNonNull(chunks, "chunks is null");
checkArgument(!chunks.isEmpty(), "At least one chunk is expected but got none");
this.chunks = chunks.iterator();
readNextChunk();
}

public Slice getSlice(int length)
throws IOException
{
if (length == 0) {
return Slices.EMPTY_SLICE;
}
ensureOpen();
while (!current.isReadable()) {
checkArgument(chunks.hasNext(), "Requested %s bytes but 0 was available", length);
readNextChunk();
}
if (current.available() >= length) {
return current.readSlice(length);
}
// requested length crosses the slice boundary
byte[] bytes = new byte[length];
try {
readFully(this, bytes, 0, bytes.length);
}
catch (IOException e) {
throw new RuntimeException("Failed to read " + length + " bytes", e);
}
return Slices.wrappedBuffer(bytes);
}

@Override
public int read(byte[] b, int off, int len)
throws IOException
{
checkPositionIndexes(off, off + len, b.length);
if (len == 0) {
return 0;
}
ensureOpen();
while (!current.isReadable()) {
if (!chunks.hasNext()) {
return -1;
}
readNextChunk();
}

return current.read(b, off, len);
}

@Override
public int read()
throws IOException
{
ensureOpen();
while (!current.isReadable() && chunks.hasNext()) {
readNextChunk();
}

return current.read();
}

Comment thread
lukasz-stec marked this conversation as resolved.
Outdated
@Override
public int available()
throws IOException
{
Comment thread
raunaqmorarka marked this conversation as resolved.
Outdated
ensureOpen();
return current.available();
}

@Override
public void close()
{
Comment thread
lukasz-stec marked this conversation as resolved.
Outdated
if (current == null) {
// already closed
return;
}
currentChunkReader.free();
Comment thread
raunaqmorarka marked this conversation as resolved.

while (chunks.hasNext()) {
chunks.next().free();
Comment thread
raunaqmorarka marked this conversation as resolved.
}
current = null;
}

private void ensureOpen()
throws IOException
{
if (current == null) {
throw new IOException("Stream closed");
}
}

private void readNextChunk()
{
if (currentChunkReader != null) {
currentChunkReader.free();
Comment thread
raunaqmorarka marked this conversation as resolved.
}
currentChunkReader = chunks.next();
Slice slice = currentChunkReader.readUnchecked();
checkArgument(slice.length() > 0, "all chunks have to be not empty");
current = slice.getInput();
}
}
Loading