From 2c617a3e6432b0ebfb1c3bd4bf86d7274021f530 Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Mon, 26 Dec 2022 16:02:40 +0530 Subject: [PATCH 1/3] Move creation of ChunkedInputStream into ParquetDataSource#planRead Avoids code duplication and makes the API easier to use --- .../parquet/AbstractParquetDataSource.java | 18 ++++++++++++++++-- .../io/trino/parquet/ParquetDataSource.java | 4 +++- .../io/trino/parquet/reader/ParquetReader.java | 4 +--- .../parquet/reader/TrinoColumnIndexStore.java | 6 +----- .../parquet/reader/TestParquetDataSource.java | 4 ++-- 5 files changed, 23 insertions(+), 13 deletions(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java b/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java index 89bac6f959b..0bfc3e3a0c1 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java @@ -13,14 +13,17 @@ */ package io.trino.parquet; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ListMultimap; import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.airlift.units.DataSize; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.memory.context.LocalMemoryContext; +import io.trino.parquet.reader.ChunkedInputStream; import java.io.IOException; import java.util.ArrayList; @@ -32,6 +35,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableMap.toImmutableMap; import static java.lang.Math.toIntExact; import static java.util.Comparator.comparingLong; import static java.util.Objects.requireNonNull; @@ -120,14 +124,24 @@ private void readFully(long position, byte[] buffer, int bufferOffset, int buffe } @Override - public final ListMultimap planRead(ListMultimap diskRanges, AggregatedMemoryContext memoryContext) + public final Map planRead(ListMultimap diskRanges, AggregatedMemoryContext memoryContext) { requireNonNull(diskRanges, "diskRanges is null"); if (diskRanges.isEmpty()) { - return ImmutableListMultimap.of(); + return ImmutableMap.of(); } + return planChunksRead(diskRanges, memoryContext).asMap() + .entrySet().stream() + .collect(toImmutableMap(Map.Entry::getKey, entry -> new ChunkedInputStream(entry.getValue()))); + } + + @VisibleForTesting + public ListMultimap planChunksRead(ListMultimap diskRanges, AggregatedMemoryContext memoryContext) + { + checkArgument(!diskRanges.isEmpty(), "diskRanges is empty"); + // // Note: this code does not use the stream APIs to avoid any extra object allocation // diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetDataSource.java b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetDataSource.java index cfa55200525..62ec264fbc5 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetDataSource.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetDataSource.java @@ -16,9 +16,11 @@ import com.google.common.collect.ListMultimap; import io.airlift.slice.Slice; import io.trino.memory.context.AggregatedMemoryContext; +import io.trino.parquet.reader.ChunkedInputStream; import java.io.Closeable; import java.io.IOException; +import java.util.Map; public interface ParquetDataSource extends Closeable @@ -37,7 +39,7 @@ Slice readTail(int length) Slice readFully(long position, int length) throws IOException; - ListMultimap planRead(ListMultimap diskRanges, AggregatedMemoryContext memoryContext); + Map planRead(ListMultimap diskRanges, AggregatedMemoryContext memoryContext); @Override default void close() diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java index 48828cc958b..64eb240db60 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java @@ -69,7 +69,6 @@ import java.util.function.Function; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.parquet.ParquetValidationUtils.validateParquet; import static io.trino.parquet.ParquetWriteValidation.StatisticsValidation; @@ -236,8 +235,7 @@ public ParquetReader( } } this.codecMetrics = ImmutableMap.copyOf(codecMetrics); - this.chunkReaders = dataSource.planRead(ranges, memoryContext).asMap().entrySet().stream() - .collect(toImmutableMap(Map.Entry::getKey, entry -> new ChunkedInputStream(entry.getValue()))); + this.chunkReaders = dataSource.planRead(ranges, memoryContext); } @Override diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java index 783310446b3..413f6564600 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java @@ -16,8 +16,6 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ListMultimap; -import com.google.common.collect.Multimap; -import io.trino.parquet.ChunkReader; import io.trino.parquet.DiskRange; import io.trino.parquet.ParquetDataSource; import org.apache.parquet.format.Util; @@ -143,9 +141,7 @@ private static Map loadIndexes( ranges.put(column.getPath(), column.getDiskRange()); } - Multimap chunkReaders = dataSource.planRead(ranges, newSimpleAggregatedMemoryContext()); - Map columnInputStreams = chunkReaders.asMap().entrySet().stream() - .collect(toImmutableMap(Map.Entry::getKey, entry -> new ChunkedInputStream(entry.getValue()))); + Map columnInputStreams = dataSource.planRead(ranges, newSimpleAggregatedMemoryContext()); try { return indexMetadata.stream() .collect(toImmutableMap( diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java index 653fc1273ff..28872232e31 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java @@ -45,7 +45,7 @@ public void testPlanReadOrdering(DataSize maxBufferSize) testingInput, new ParquetReaderOptions().withMaxBufferSize(maxBufferSize)); - ListMultimap chunkReaders = dataSource.planRead( + ListMultimap chunkReaders = dataSource.planChunksRead( ImmutableListMultimap.builder() .putAll("test", new DiskRange(0, 200), new DiskRange(400, 100), new DiskRange(700, 200)) .build(), @@ -76,7 +76,7 @@ public void testMemoryAccounting() testingInput, new ParquetReaderOptions().withMaxBufferSize(DataSize.ofBytes(500))); AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext(); - ListMultimap chunkReaders = dataSource.planRead(ImmutableListMultimap.builder() + ListMultimap chunkReaders = dataSource.planChunksRead(ImmutableListMultimap.builder() .put("1", new DiskRange(0, 200)) .put("2", new DiskRange(400, 100)) .put("3", new DiskRange(700, 200)) From 8336278cb6c2d501e82a56f590e96d64d864726d Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Sun, 1 Jan 2023 22:26:15 +0530 Subject: [PATCH 2/3] Account for memory usage before reading from data source in parquet --- .../main/java/io/trino/parquet/AbstractParquetDataSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java b/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java index 0bfc3e3a0c1..7bf97d49a89 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/AbstractParquetDataSource.java @@ -326,9 +326,9 @@ public Slice read() if (data == null) { byte[] buffer = new byte[toIntExact(range.getLength())]; + readerMemoryUsage.setBytes(buffer.length); readFully(range.getOffset(), buffer, 0, buffer.length); data = Slices.wrappedBuffer(buffer); - readerMemoryUsage.setBytes(data.length()); } return data; From 0cc5c33f74f664d10b2983181d8532f8c0de67b3 Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Thu, 29 Dec 2022 18:30:46 +0530 Subject: [PATCH 3/3] Avoid reading data in parquet ChunkedInputStream constructor Chunks should be read lazily to avoid reading data in case lazy loading of blocks can avoid reading a column due to a selective filter on a preceding column --- .../parquet/reader/ChunkedInputStream.java | 12 ++++--- .../reader/TestChunkedInputStream.java | 9 +++--- .../parquet/reader/TestParquetDataSource.java | 32 +++++++++++++++++++ 3 files changed, 43 insertions(+), 10 deletions(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ChunkedInputStream.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ChunkedInputStream.java index 998f330844c..0bd40ccfae3 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ChunkedInputStream.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ChunkedInputStream.java @@ -27,6 +27,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkPositionIndexes; import static com.google.common.io.ByteStreams.readFully; +import static io.airlift.slice.Slices.EMPTY_SLICE; import static java.util.Objects.requireNonNull; /** @@ -41,21 +42,21 @@ public final class ChunkedInputStream { private final Iterator chunks; private ChunkReader currentChunkReader; - private BasicSliceInput current; + // current is explicitly initialized to EMPTY_SLICE as this field is set to null when the stream is closed + private BasicSliceInput current = EMPTY_SLICE.getInput(); public ChunkedInputStream(Collection chunks) { requireNonNull(chunks, "chunks is null"); checkArgument(!chunks.isEmpty(), "At least one chunk is expected but got none"); this.chunks = chunks.iterator(); - readNextChunk(); } public Slice getSlice(int length) throws IOException { if (length == 0) { - return Slices.EMPTY_SLICE; + return EMPTY_SLICE; } ensureOpen(); while (!current.isReadable()) { @@ -122,8 +123,9 @@ public void close() // already closed return; } - currentChunkReader.free(); - + if (currentChunkReader != null) { + currentChunkReader.free(); + } while (chunks.hasNext()) { chunks.next().free(); } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestChunkedInputStream.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestChunkedInputStream.java index b22a3e7c820..1153e759c93 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestChunkedInputStream.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestChunkedInputStream.java @@ -81,13 +81,12 @@ public void testInput(List chunks) // verify available ChunkedInputStream availableInput = input(slices); - assertEquals(availableInput.available(), chunks.get(0).length); - availableInput.skipNBytes(chunks.get(0).length); + // nothing is read initially assertEquals(availableInput.available(), 0); - if (chunks.size() > 1) { + for (byte[] chunk : chunks) { availableInput.read(); - assertEquals(availableInput.available(), chunks.get(1).length - 1); - availableInput.skipNBytes(chunks.get(1).length - 1); + assertEquals(availableInput.available(), chunk.length - 1); + availableInput.skipNBytes(chunk.length - 1); assertEquals(availableInput.available(), 0); } } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java index 28872232e31..ac7a1ac1a3d 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetDataSource.java @@ -28,6 +28,7 @@ import org.testng.annotations.Test; import java.io.IOException; +import java.util.Map; import java.util.stream.IntStream; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; @@ -106,4 +107,35 @@ public void testMemoryAccounting() // both readers using merged reader are freed, all memory is released assertEquals(memoryContext.getBytes(), 0); } + + @Test + public void testChunkedInputStreamLazyLoading() + throws IOException + { + Slice testingInput = Slices.wrappedIntArray(IntStream.range(0, 1000).toArray()); + TestingParquetDataSource dataSource = new TestingParquetDataSource( + testingInput, + new ParquetReaderOptions() + .withMaxBufferSize(DataSize.ofBytes(500)) + .withMaxMergeDistance(DataSize.ofBytes(0))); + AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext(); + Map inputStreams = dataSource.planRead( + ImmutableListMultimap.builder() + .put("1", new DiskRange(0, 200)) + .put("1", new DiskRange(250, 50)) + .put("2", new DiskRange(400, 100)) + .put("2", new DiskRange(700, 200)) + .build(), + memoryContext); + assertThat(memoryContext.getBytes()).isEqualTo(0); + + inputStreams.get("1").getSlice(200); + assertThat(memoryContext.getBytes()).isEqualTo(200); + + inputStreams.get("2").getSlice(100); + assertThat(memoryContext.getBytes()).isEqualTo(200 + 100); + + inputStreams.get("1").close(); + assertThat(memoryContext.getBytes()).isEqualTo(100); + } }