Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@
*/
package io.trino.parquet;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ListMultimap;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.parquet.reader.ChunkedInputStream;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -32,6 +35,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.lang.Math.toIntExact;
import static java.util.Comparator.comparingLong;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -120,14 +124,24 @@ private void readFully(long position, byte[] buffer, int bufferOffset, int buffe
}

@Override
public final <K> ListMultimap<K, ChunkReader> planRead(ListMultimap<K, DiskRange> diskRanges, AggregatedMemoryContext memoryContext)
public final <K> Map<K, ChunkedInputStream> planRead(ListMultimap<K, DiskRange> diskRanges, AggregatedMemoryContext memoryContext)
{
requireNonNull(diskRanges, "diskRanges is null");

if (diskRanges.isEmpty()) {
return ImmutableListMultimap.of();
return ImmutableMap.of();
}

return planChunksRead(diskRanges, memoryContext).asMap()
.entrySet().stream()
.collect(toImmutableMap(Map.Entry::getKey, entry -> new ChunkedInputStream(entry.getValue())));
}

@VisibleForTesting
public <K> ListMultimap<K, ChunkReader> planChunksRead(ListMultimap<K, DiskRange> diskRanges, AggregatedMemoryContext memoryContext)
{
checkArgument(!diskRanges.isEmpty(), "diskRanges is empty");

//
// Note: this code does not use the stream APIs to avoid any extra object allocation
//
Expand Down Expand Up @@ -312,9 +326,9 @@ public Slice read()

if (data == null) {
byte[] buffer = new byte[toIntExact(range.getLength())];
readerMemoryUsage.setBytes(buffer.length);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: in practice it doesn't matter since memory usage is propagated in ScanFilterOperator in a lazy way anyway

readFully(range.getOffset(), buffer, 0, buffer.length);
data = Slices.wrappedBuffer(buffer);
readerMemoryUsage.setBytes(data.length());
}

return data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import com.google.common.collect.ListMultimap;
import io.airlift.slice.Slice;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.parquet.reader.ChunkedInputStream;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;

public interface ParquetDataSource
extends Closeable
Expand All @@ -37,7 +39,7 @@ Slice readTail(int length)
Slice readFully(long position, int length)
throws IOException;

<K> ListMultimap<K, ChunkReader> planRead(ListMultimap<K, DiskRange> diskRanges, AggregatedMemoryContext memoryContext);
<K> Map<K, ChunkedInputStream> planRead(ListMultimap<K, DiskRange> diskRanges, AggregatedMemoryContext memoryContext);

@Override
default void close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkPositionIndexes;
import static com.google.common.io.ByteStreams.readFully;
import static io.airlift.slice.Slices.EMPTY_SLICE;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -41,21 +42,21 @@ public final class ChunkedInputStream
{
private final Iterator<? extends ChunkReader> chunks;
private ChunkReader currentChunkReader;
private BasicSliceInput current;
// current is explicitly initialized to EMPTY_SLICE as this field is set to null when the stream is closed
private BasicSliceInput current = EMPTY_SLICE.getInput();

public ChunkedInputStream(Collection<? extends ChunkReader> chunks)
{
requireNonNull(chunks, "chunks is null");
checkArgument(!chunks.isEmpty(), "At least one chunk is expected but got none");
this.chunks = chunks.iterator();
readNextChunk();
}

public Slice getSlice(int length)
throws IOException
{
if (length == 0) {
return Slices.EMPTY_SLICE;
return EMPTY_SLICE;
}
ensureOpen();
while (!current.isReadable()) {
Expand Down Expand Up @@ -122,8 +123,9 @@ public void close()
// already closed
return;
}
currentChunkReader.free();

if (currentChunkReader != null) {
currentChunkReader.free();
}
while (chunks.hasNext()) {
chunks.next().free();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.parquet.ParquetValidationUtils.validateParquet;
import static io.trino.parquet.ParquetWriteValidation.StatisticsValidation;
Expand Down Expand Up @@ -236,8 +235,7 @@ public ParquetReader(
}
}
this.codecMetrics = ImmutableMap.copyOf(codecMetrics);
this.chunkReaders = dataSource.planRead(ranges, memoryContext).asMap().entrySet().stream()
.collect(toImmutableMap(Map.Entry::getKey, entry -> new ChunkedInputStream(entry.getValue())));
this.chunkReaders = dataSource.planRead(ranges, memoryContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
import io.trino.parquet.ChunkReader;
import io.trino.parquet.DiskRange;
import io.trino.parquet.ParquetDataSource;
import org.apache.parquet.format.Util;
Expand Down Expand Up @@ -143,9 +141,7 @@ private static <T> Map<ColumnPath, T> loadIndexes(
ranges.put(column.getPath(), column.getDiskRange());
}

Multimap<ColumnPath, ChunkReader> chunkReaders = dataSource.planRead(ranges, newSimpleAggregatedMemoryContext());
Map<ColumnPath, ChunkedInputStream> columnInputStreams = chunkReaders.asMap().entrySet().stream()
.collect(toImmutableMap(Map.Entry::getKey, entry -> new ChunkedInputStream(entry.getValue())));
Map<ColumnPath, ChunkedInputStream> columnInputStreams = dataSource.planRead(ranges, newSimpleAggregatedMemoryContext());
try {
return indexMetadata.stream()
.collect(toImmutableMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,12 @@ public void testInput(List<byte[]> chunks)

// verify available
ChunkedInputStream availableInput = input(slices);
assertEquals(availableInput.available(), chunks.get(0).length);
availableInput.skipNBytes(chunks.get(0).length);
// nothing is read initially
assertEquals(availableInput.available(), 0);
if (chunks.size() > 1) {
for (byte[] chunk : chunks) {
availableInput.read();
assertEquals(availableInput.available(), chunks.get(1).length - 1);
availableInput.skipNBytes(chunks.get(1).length - 1);
assertEquals(availableInput.available(), chunk.length - 1);
availableInput.skipNBytes(chunk.length - 1);
assertEquals(availableInput.available(), 0);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.Map;
import java.util.stream.IntStream;

import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
Expand All @@ -45,7 +46,7 @@ public void testPlanReadOrdering(DataSize maxBufferSize)
testingInput,
new ParquetReaderOptions().withMaxBufferSize(maxBufferSize));

ListMultimap<String, ChunkReader> chunkReaders = dataSource.planRead(
ListMultimap<String, ChunkReader> chunkReaders = dataSource.planChunksRead(
ImmutableListMultimap.<String, DiskRange>builder()
.putAll("test", new DiskRange(0, 200), new DiskRange(400, 100), new DiskRange(700, 200))
.build(),
Expand Down Expand Up @@ -76,7 +77,7 @@ public void testMemoryAccounting()
testingInput,
new ParquetReaderOptions().withMaxBufferSize(DataSize.ofBytes(500)));
AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext();
ListMultimap<String, ChunkReader> chunkReaders = dataSource.planRead(ImmutableListMultimap.<String, DiskRange>builder()
ListMultimap<String, ChunkReader> chunkReaders = dataSource.planChunksRead(ImmutableListMultimap.<String, DiskRange>builder()
.put("1", new DiskRange(0, 200))
.put("2", new DiskRange(400, 100))
.put("3", new DiskRange(700, 200))
Expand Down Expand Up @@ -106,4 +107,35 @@ public void testMemoryAccounting()
// both readers using merged reader are freed, all memory is released
assertEquals(memoryContext.getBytes(), 0);
}

@Test
public void testChunkedInputStreamLazyLoading()
throws IOException
{
Slice testingInput = Slices.wrappedIntArray(IntStream.range(0, 1000).toArray());
TestingParquetDataSource dataSource = new TestingParquetDataSource(
testingInput,
new ParquetReaderOptions()
.withMaxBufferSize(DataSize.ofBytes(500))
.withMaxMergeDistance(DataSize.ofBytes(0)));
AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext();
Map<String, ChunkedInputStream> inputStreams = dataSource.planRead(
ImmutableListMultimap.<String, DiskRange>builder()
.put("1", new DiskRange(0, 200))
.put("1", new DiskRange(250, 50))
.put("2", new DiskRange(400, 100))
.put("2", new DiskRange(700, 200))
.build(),
memoryContext);
assertThat(memoryContext.getBytes()).isEqualTo(0);

inputStreams.get("1").getSlice(200);
assertThat(memoryContext.getBytes()).isEqualTo(200);

inputStreams.get("2").getSlice(100);
assertThat(memoryContext.getBytes()).isEqualTo(200 + 100);

inputStreams.get("1").close();
assertThat(memoryContext.getBytes()).isEqualTo(100);
}
}