diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java b/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java index 89bac6f959b5..7bf97d49a891 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java @@ -13,14 +13,17 @@ */ package io.trino.parquet; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ListMultimap; import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.airlift.units.DataSize; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.memory.context.LocalMemoryContext; +import io.trino.parquet.reader.ChunkedInputStream; import java.io.IOException; import java.util.ArrayList; @@ -32,6 +35,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableMap.toImmutableMap; import static java.lang.Math.toIntExact; import static java.util.Comparator.comparingLong; import static java.util.Objects.requireNonNull; @@ -120,14 +124,24 @@ private void readFully(long position, byte[] buffer, int bufferOffset, int buffe } @Override - public final ListMultimap planRead(ListMultimap diskRanges, AggregatedMemoryContext memoryContext) + public final Map planRead(ListMultimap diskRanges, AggregatedMemoryContext memoryContext) { requireNonNull(diskRanges, "diskRanges is null"); if (diskRanges.isEmpty()) { - return ImmutableListMultimap.of(); + return ImmutableMap.of(); } + return planChunksRead(diskRanges, memoryContext).asMap() + .entrySet().stream() + .collect(toImmutableMap(Map.Entry::getKey, entry -> new ChunkedInputStream(entry.getValue()))); + } + + @VisibleForTesting + public ListMultimap planChunksRead(ListMultimap diskRanges, AggregatedMemoryContext memoryContext) + { + checkArgument(!diskRanges.isEmpty(), "diskRanges is empty"); + // // Note: this code does not use the stream APIs to avoid any extra object allocation // @@ -312,9 +326,9 @@ public Slice read() if (data == null) { byte[] buffer = new byte[toIntExact(range.getLength())]; + readerMemoryUsage.setBytes(buffer.length); readFully(range.getOffset(), buffer, 0, buffer.length); data = Slices.wrappedBuffer(buffer); - readerMemoryUsage.setBytes(data.length()); } return data; diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetDataSource.java b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetDataSource.java index cfa55200525e..62ec264fbc5b 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetDataSource.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetDataSource.java @@ -16,9 +16,11 @@ import com.google.common.collect.ListMultimap; import io.airlift.slice.Slice; import io.trino.memory.context.AggregatedMemoryContext; +import io.trino.parquet.reader.ChunkedInputStream; import java.io.Closeable; import java.io.IOException; +import java.util.Map; public interface ParquetDataSource extends Closeable @@ -37,7 +39,7 @@ Slice readTail(int length) Slice readFully(long position, int length) throws IOException; - ListMultimap planRead(ListMultimap diskRanges, AggregatedMemoryContext memoryContext); + Map planRead(ListMultimap diskRanges, AggregatedMemoryContext memoryContext); @Override default void close() diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ChunkedInputStream.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ChunkedInputStream.java index 998f330844ce..0bd40ccfae39 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ChunkedInputStream.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ChunkedInputStream.java @@ -27,6 +27,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkPositionIndexes; import static com.google.common.io.ByteStreams.readFully; +import static io.airlift.slice.Slices.EMPTY_SLICE; import static java.util.Objects.requireNonNull; /** @@ -41,21 +42,21 @@ public final class ChunkedInputStream { private final Iterator chunks; private ChunkReader currentChunkReader; - private BasicSliceInput current; + // current is explicitly initialized to EMPTY_SLICE as this field is set to null when the stream is closed + private BasicSliceInput current = EMPTY_SLICE.getInput(); public ChunkedInputStream(Collection chunks) { requireNonNull(chunks, "chunks is null"); checkArgument(!chunks.isEmpty(), "At least one chunk is expected but got none"); this.chunks = chunks.iterator(); - readNextChunk(); } public Slice getSlice(int length) throws IOException { if (length == 0) { - return Slices.EMPTY_SLICE; + return EMPTY_SLICE; } ensureOpen(); while (!current.isReadable()) { @@ -122,8 +123,9 @@ public void close() // already closed return; } - currentChunkReader.free(); - + if (currentChunkReader != null) { + currentChunkReader.free(); + } while (chunks.hasNext()) { chunks.next().free(); } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java index 48828cc958be..64eb240db60e 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java @@ -69,7 +69,6 @@ import java.util.function.Function; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.parquet.ParquetValidationUtils.validateParquet; import static io.trino.parquet.ParquetWriteValidation.StatisticsValidation; @@ -236,8 +235,7 @@ public ParquetReader( } } this.codecMetrics = ImmutableMap.copyOf(codecMetrics); - this.chunkReaders = dataSource.planRead(ranges, memoryContext).asMap().entrySet().stream() - .collect(toImmutableMap(Map.Entry::getKey, entry -> new ChunkedInputStream(entry.getValue()))); + this.chunkReaders = dataSource.planRead(ranges, memoryContext); } @Override diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java index 783310446b30..413f65646007 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java @@ -16,8 +16,6 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ListMultimap; -import com.google.common.collect.Multimap; -import io.trino.parquet.ChunkReader; import io.trino.parquet.DiskRange; import io.trino.parquet.ParquetDataSource; import org.apache.parquet.format.Util; @@ -143,9 +141,7 @@ private static Map loadIndexes( ranges.put(column.getPath(), column.getDiskRange()); } - Multimap chunkReaders = dataSource.planRead(ranges, newSimpleAggregatedMemoryContext()); - Map columnInputStreams = chunkReaders.asMap().entrySet().stream() - .collect(toImmutableMap(Map.Entry::getKey, entry -> new ChunkedInputStream(entry.getValue()))); + Map columnInputStreams = dataSource.planRead(ranges, newSimpleAggregatedMemoryContext()); try { return indexMetadata.stream() .collect(toImmutableMap( diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestChunkedInputStream.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestChunkedInputStream.java index b22a3e7c8209..1153e759c931 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestChunkedInputStream.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestChunkedInputStream.java @@ -81,13 +81,12 @@ public void testInput(List chunks) // verify available ChunkedInputStream availableInput = input(slices); - assertEquals(availableInput.available(), chunks.get(0).length); - availableInput.skipNBytes(chunks.get(0).length); + // nothing is read initially assertEquals(availableInput.available(), 0); - if (chunks.size() > 1) { + for (byte[] chunk : chunks) { availableInput.read(); - assertEquals(availableInput.available(), chunks.get(1).length - 1); - availableInput.skipNBytes(chunks.get(1).length - 1); + assertEquals(availableInput.available(), chunk.length - 1); + availableInput.skipNBytes(chunk.length - 1); assertEquals(availableInput.available(), 0); } } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java index 653fc1273ffe..ac7a1ac1a3d9 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java @@ -28,6 +28,7 @@ import org.testng.annotations.Test; import java.io.IOException; +import java.util.Map; import java.util.stream.IntStream; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; @@ -45,7 +46,7 @@ public void testPlanReadOrdering(DataSize maxBufferSize) testingInput, new ParquetReaderOptions().withMaxBufferSize(maxBufferSize)); - ListMultimap chunkReaders = dataSource.planRead( + ListMultimap chunkReaders = dataSource.planChunksRead( ImmutableListMultimap.builder() .putAll("test", new DiskRange(0, 200), new DiskRange(400, 100), new DiskRange(700, 200)) .build(), @@ -76,7 +77,7 @@ public void testMemoryAccounting() testingInput, new ParquetReaderOptions().withMaxBufferSize(DataSize.ofBytes(500))); AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext(); - ListMultimap chunkReaders = dataSource.planRead(ImmutableListMultimap.builder() + ListMultimap chunkReaders = dataSource.planChunksRead(ImmutableListMultimap.builder() .put("1", new DiskRange(0, 200)) .put("2", new DiskRange(400, 100)) .put("3", new DiskRange(700, 200)) @@ -106,4 +107,35 @@ public void testMemoryAccounting() // both readers using merged reader are freed, all memory is released assertEquals(memoryContext.getBytes(), 0); } + + @Test + public void testChunkedInputStreamLazyLoading() + throws IOException + { + Slice testingInput = Slices.wrappedIntArray(IntStream.range(0, 1000).toArray()); + TestingParquetDataSource dataSource = new TestingParquetDataSource( + testingInput, + new ParquetReaderOptions() + .withMaxBufferSize(DataSize.ofBytes(500)) + .withMaxMergeDistance(DataSize.ofBytes(0))); + AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext(); + Map inputStreams = dataSource.planRead( + ImmutableListMultimap.builder() + .put("1", new DiskRange(0, 200)) + .put("1", new DiskRange(250, 50)) + .put("2", new DiskRange(400, 100)) + .put("2", new DiskRange(700, 200)) + .build(), + memoryContext); + assertThat(memoryContext.getBytes()).isEqualTo(0); + + inputStreams.get("1").getSlice(200); + assertThat(memoryContext.getBytes()).isEqualTo(200); + + inputStreams.get("2").getSlice(100); + assertThat(memoryContext.getBytes()).isEqualTo(200 + 100); + + inputStreams.get("1").close(); + assertThat(memoryContext.getBytes()).isEqualTo(100); + } }