diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetWriteValidation.java b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetWriteValidation.java index 83a712efb19f..832064dd6672 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetWriteValidation.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetWriteValidation.java @@ -18,6 +18,7 @@ import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.airlift.slice.XxHash64; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.spi.Page; import io.trino.spi.block.Block; import io.trino.spi.type.Type; @@ -126,17 +127,17 @@ public void validateColumns(ParquetDataSourceId dataSourceId, MessageType schema } } - public void validateBlocksMetadata(ParquetDataSourceId dataSourceId, List blocksMetaData) + public void validateBlocksMetadata(ParquetDataSourceId dataSourceId, List rowGroupInfos) throws ParquetCorruptionException { validateParquet( - blocksMetaData.size() == rowGroups.size(), + rowGroupInfos.size() == rowGroups.size(), dataSourceId, "Number of row groups %d did not match %d", - blocksMetaData.size(), + rowGroupInfos.size(), rowGroups.size()); - for (int rowGroupIndex = 0; rowGroupIndex < blocksMetaData.size(); rowGroupIndex++) { - BlockMetaData block = blocksMetaData.get(rowGroupIndex); + for (int rowGroupIndex = 0; rowGroupIndex < rowGroupInfos.size(); rowGroupIndex++) { + BlockMetaData block = rowGroupInfos.get(rowGroupIndex).blockMetaData(); RowGroup rowGroup = rowGroups.get(rowGroupIndex); validateParquet( block.getRowCount() == rowGroup.getNum_rows(), diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java index 3b507aa24685..726c43dabdef 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java @@ -23,6 +23,8 @@ import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.ParquetEncoding; +import io.trino.parquet.ParquetReaderOptions; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.DecimalType; import io.trino.spi.type.Type; @@ -50,9 +52,11 @@ import java.util.Optional; import java.util.Set; +import static io.trino.parquet.BloomFilterStore.getBloomFilterStore; import static io.trino.parquet.ParquetCompressionUtils.decompress; import static io.trino.parquet.ParquetReaderUtils.isOnlyDictionaryEncodingPages; import static io.trino.parquet.ParquetTypeUtils.getParquetEncoding; +import static io.trino.parquet.reader.TrinoColumnIndexStore.getColumnIndexStore; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DateType.DATE; import static io.trino.spi.type.IntegerType.INTEGER; @@ -172,6 +176,50 @@ public static boolean predicateMatches( columnIndexStore); } + public static List getFilteredRowGroups( + long splitStart, + long splitLength, + ParquetDataSource dataSource, + List blocksMetaData, + List> parquetTupleDomains, + List parquetPredicates, + Map, ColumnDescriptor> descriptorsByPath, + DateTimeZone timeZone, + int domainCompactionThreshold, + ParquetReaderOptions options) + throws IOException + { + long fileRowCount = 0; + ImmutableList.Builder rowGroupInfoBuilder = ImmutableList.builder(); + for (BlockMetaData block : blocksMetaData) { + long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); + boolean splitContainsBlock = splitStart <= firstDataPage && firstDataPage < splitStart + splitLength; + if (splitContainsBlock) { + for (int i = 0; i < parquetTupleDomains.size(); i++) { + TupleDomain parquetTupleDomain = parquetTupleDomains.get(i); + TupleDomainParquetPredicate parquetPredicate = parquetPredicates.get(i); + Optional columnIndex = getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, options); + Optional bloomFilterStore = getBloomFilterStore(dataSource, block, parquetTupleDomain, options); + if (predicateMatches( + parquetPredicate, + block, + dataSource, + descriptorsByPath, + parquetTupleDomain, + columnIndex, + bloomFilterStore, + timeZone, + domainCompactionThreshold)) { + rowGroupInfoBuilder.add(new RowGroupInfo(block, fileRowCount, columnIndex)); + break; + } + } + } + fileRowCount += block.getRowCount(); + } + return rowGroupInfoBuilder.build(); + } + private static Map> getStatistics(BlockMetaData blockMetadata, Map, ColumnDescriptor> descriptorsByPath) { ImmutableMap.Builder> statistics = ImmutableMap.builder(); diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java index 914942f5c77b..aa9633f2763b 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java @@ -80,7 +80,6 @@ import static java.lang.Math.min; import static java.lang.Math.toIntExact; import static java.lang.String.format; -import static java.util.Collections.nCopies; import static java.util.Objects.requireNonNull; public class ParquetReader @@ -94,8 +93,7 @@ public class ParquetReader public static final String COLUMN_INDEX_ROWS_FILTERED = "ParquetColumnIndexRowsFiltered"; private final Optional fileCreatedBy; - private final List blocks; - private final List firstRowsOfBlocks; + private final List rowGroups; private final List columnFields; private final List primitiveFields; private final ParquetDataSource dataSource; @@ -123,7 +121,6 @@ public class ParquetReader private AggregatedMemoryContext currentRowGroupMemoryContext; private final Map chunkReaders; - private final List> columnIndexStore; private final Optional writeValidation; private final Optional writeChecksumBuilder; private final Optional rowGroupStatisticsValidation; @@ -136,30 +133,13 @@ public class ParquetReader public ParquetReader( Optional fileCreatedBy, List columnFields, - List blocks, - List firstRowsOfBlocks, - ParquetDataSource dataSource, - DateTimeZone timeZone, - AggregatedMemoryContext memoryContext, - ParquetReaderOptions options, - Function exceptionTransform) - throws IOException - { - this(fileCreatedBy, columnFields, blocks, firstRowsOfBlocks, dataSource, timeZone, memoryContext, options, exceptionTransform, Optional.empty(), nCopies(blocks.size(), Optional.empty()), Optional.empty()); - } - - public ParquetReader( - Optional fileCreatedBy, - List columnFields, - List blocks, - List firstRowsOfBlocks, + List rowGroups, ParquetDataSource dataSource, DateTimeZone timeZone, AggregatedMemoryContext memoryContext, ParquetReaderOptions options, Function exceptionTransform, Optional parquetPredicate, - List> columnIndexStore, Optional writeValidation) throws IOException { @@ -167,8 +147,7 @@ public ParquetReader( requireNonNull(columnFields, "columnFields is null"); this.columnFields = ImmutableList.copyOf(columnFields); this.primitiveFields = getPrimitiveFields(columnFields.stream().map(Column::field).collect(toImmutableList())); - this.blocks = requireNonNull(blocks, "blocks is null"); - this.firstRowsOfBlocks = requireNonNull(firstRowsOfBlocks, "firstRowsOfBlocks is null"); + this.rowGroups = requireNonNull(rowGroups, "rowGroups is null"); this.dataSource = requireNonNull(dataSource, "dataSource is null"); this.columnReaderFactory = new ColumnReaderFactory(timeZone); this.memoryContext = requireNonNull(memoryContext, "memoryContext is null"); @@ -178,31 +157,28 @@ public ParquetReader( this.columnReaders = new HashMap<>(); this.maxBytesPerCell = new HashMap<>(); - checkArgument(blocks.size() == firstRowsOfBlocks.size(), "elements of firstRowsOfBlocks must correspond to blocks"); - this.writeValidation = requireNonNull(writeValidation, "writeValidation is null"); validateWrite( validation -> fileCreatedBy.equals(Optional.of(validation.getCreatedBy())), "Expected created by %s, found %s", writeValidation.map(ParquetWriteValidation::getCreatedBy), fileCreatedBy); - validateBlockMetadata(blocks); + validateBlockMetadata(rowGroups); this.writeChecksumBuilder = writeValidation.map(validation -> createWriteChecksumBuilder(validation.getTypes())); this.rowGroupStatisticsValidation = writeValidation.map(validation -> createStatisticsValidationBuilder(validation.getTypes())); requireNonNull(parquetPredicate, "parquetPredicate is null"); - this.columnIndexStore = requireNonNull(columnIndexStore, "columnIndexStore is null"); Optional filter = Optional.empty(); if (parquetPredicate.isPresent() && options.isUseColumnIndex()) { filter = parquetPredicate.get().toParquetFilter(timeZone); } - this.blockRowRanges = calculateFilteredRowRanges(blocks, filter, columnIndexStore, primitiveFields); + this.blockRowRanges = calculateFilteredRowRanges(rowGroups, filter, primitiveFields); this.blockFactory = new ParquetBlockFactory(exceptionTransform); ListMultimap ranges = ArrayListMultimap.create(); Map codecMetrics = new HashMap<>(); - for (int rowGroup = 0; rowGroup < blocks.size(); rowGroup++) { - BlockMetaData metadata = blocks.get(rowGroup); + for (int rowGroup = 0; rowGroup < rowGroups.size(); rowGroup++) { + BlockMetaData metadata = rowGroups.get(rowGroup).blockMetaData(); for (PrimitiveField field : primitiveFields) { int columnId = field.getId(); ColumnChunkMetaData chunkMetadata = getColumnChunkMetaData(metadata, field.getDescriptor()); @@ -317,11 +293,12 @@ private boolean advanceToNextRowGroup() } currentRowGroup++; - if (currentRowGroup == blocks.size()) { + if (currentRowGroup == rowGroups.size()) { return false; } - currentBlockMetadata = blocks.get(currentRowGroup); - firstRowIndexInGroup = firstRowsOfBlocks.get(currentRowGroup); + RowGroupInfo rowGroupInfo = rowGroups.get(currentRowGroup); + currentBlockMetadata = rowGroupInfo.blockMetaData(); + firstRowIndexInGroup = rowGroupInfo.fileRowOffset(); currentGroupRowCount = currentBlockMetadata.getRowCount(); FilteredRowRanges currentGroupRowRanges = blockRowRanges[currentRowGroup]; log.debug("advanceToNextRowGroup dataSource %s, currentRowGroup %d, rowRanges %s, currentBlockMetadata %s", dataSource.getId(), currentRowGroup, currentGroupRowRanges, currentBlockMetadata); @@ -450,7 +427,7 @@ private static Block toNotNullSupressedBlock(int positionCount, boolean[] rowIsN @Nullable private FilteredOffsetIndex getFilteredOffsetIndex(FilteredRowRanges rowRanges, int rowGroup, long rowGroupRowCount, ColumnPath columnPath) { - Optional rowGroupColumnIndexStore = this.columnIndexStore.get(rowGroup); + Optional rowGroupColumnIndexStore = this.rowGroups.get(rowGroup).columnIndexStore(); if (rowGroupColumnIndexStore.isEmpty()) { return null; } @@ -587,24 +564,24 @@ public AggregatedMemoryContext getMemoryContext() } private static FilteredRowRanges[] calculateFilteredRowRanges( - List blocks, + List rowGroups, Optional filter, - List> columnIndexStore, List primitiveFields) { - FilteredRowRanges[] blockRowRanges = new FilteredRowRanges[blocks.size()]; + FilteredRowRanges[] blockRowRanges = new FilteredRowRanges[rowGroups.size()]; if (filter.isEmpty()) { return blockRowRanges; } Set paths = primitiveFields.stream() .map(field -> ColumnPath.get(field.getDescriptor().getPath())) .collect(toImmutableSet()); - for (int rowGroup = 0; rowGroup < blocks.size(); rowGroup++) { - Optional rowGroupColumnIndexStore = columnIndexStore.get(rowGroup); + for (int rowGroup = 0; rowGroup < rowGroups.size(); rowGroup++) { + RowGroupInfo rowGroupInfo = rowGroups.get(rowGroup); + Optional rowGroupColumnIndexStore = rowGroupInfo.columnIndexStore(); if (rowGroupColumnIndexStore.isEmpty()) { continue; } - BlockMetaData metadata = blocks.get(rowGroup); + BlockMetaData metadata = rowGroupInfo.blockMetaData(); long rowGroupRowCount = metadata.getRowCount(); FilteredRowRanges rowRanges = new FilteredRowRanges(ColumnIndexFilter.calculateRowRanges( FilterCompat.get(filter.get()), @@ -627,11 +604,11 @@ private void validateWritePageChecksum(Page page) } } - private void validateBlockMetadata(List blockMetaData) + private void validateBlockMetadata(List rowGroups) throws ParquetCorruptionException { if (writeValidation.isPresent()) { - writeValidation.get().validateBlocksMetadata(dataSource.getId(), blockMetaData); + writeValidation.get().validateBlocksMetadata(dataSource.getId(), rowGroups); } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/RowGroupInfo.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/RowGroupInfo.java new file mode 100644 index 000000000000..d9fdf9e735d4 --- /dev/null +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/RowGroupInfo.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader; + +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; + +import java.util.Optional; + +public record RowGroupInfo(BlockMetaData blockMetaData, long fileRowOffset, Optional columnIndexStore) {} diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java index c9ddb8e7f94b..5b813482625f 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java @@ -26,6 +26,7 @@ import io.trino.parquet.ParquetWriteValidation; import io.trino.parquet.reader.MetadataReader; import io.trino.parquet.reader.ParquetReader; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.parquet.writer.ColumnWriter.BufferData; import io.trino.spi.Page; import io.trino.spi.type.Type; @@ -69,7 +70,6 @@ import static java.lang.Math.max; import static java.lang.Math.min; import static java.nio.charset.StandardCharsets.US_ASCII; -import static java.util.Collections.nCopies; import static java.util.Objects.requireNonNull; import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0; @@ -252,17 +252,15 @@ private ParquetReader createParquetReader(ParquetDataSource input, ParquetMetada .orElseThrow())); } long nextStart = 0; - ImmutableList.Builder blockStartsBuilder = ImmutableList.builder(); + ImmutableList.Builder rowGroupInfoBuilder = ImmutableList.builder(); for (BlockMetaData block : parquetMetadata.getBlocks()) { - blockStartsBuilder.add(nextStart); + rowGroupInfoBuilder.add(new RowGroupInfo(block, nextStart, Optional.empty())); nextStart += block.getRowCount(); } - List blockStarts = blockStartsBuilder.build(); return new ParquetReader( Optional.ofNullable(fileMetaData.getCreatedBy()), columnFields.build(), - parquetMetadata.getBlocks(), - blockStarts, + rowGroupInfoBuilder.build(), input, parquetTimeZone.orElseThrow(), newSimpleAggregatedMemoryContext(), @@ -272,7 +270,6 @@ private ParquetReader createParquetReader(ParquetDataSource input, ParquetMetada return new RuntimeException(exception); }, Optional.empty(), - nCopies(blockStarts.size(), Optional.empty()), Optional.of(writeValidation)); } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java b/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java index cceb67158af1..47dcbd151d25 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java @@ -18,6 +18,7 @@ import io.airlift.slice.Slices; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.parquet.reader.ParquetReader; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.parquet.writer.ParquetSchemaConverter; import io.trino.parquet.writer.ParquetWriter; import io.trino.parquet.writer.ParquetWriterOptions; @@ -55,7 +56,6 @@ import static io.trino.spi.block.MapBlock.fromKeyValueBlock; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.TypeUtils.writeNativeValue; -import static java.util.Collections.nCopies; import static org.joda.time.DateTimeZone.UTC; public class ParquetTestUtils @@ -114,17 +114,15 @@ public static ParquetReader createParquetReader( .orElseThrow())); } long nextStart = 0; - ImmutableList.Builder blockStartsBuilder = ImmutableList.builder(); + ImmutableList.Builder rowGroupInfoBuilder = ImmutableList.builder(); for (BlockMetaData block : parquetMetadata.getBlocks()) { - blockStartsBuilder.add(nextStart); + rowGroupInfoBuilder.add(new RowGroupInfo(block, nextStart, Optional.empty())); nextStart += block.getRowCount(); } - List blockStarts = blockStartsBuilder.build(); return new ParquetReader( Optional.ofNullable(fileMetaData.getCreatedBy()), columnFields.build(), - parquetMetadata.getBlocks(), - blockStarts, + rowGroupInfoBuilder.build(), input, UTC, memoryContext, @@ -134,7 +132,6 @@ public static ParquetReader createParquetReader( return new RuntimeException(exception); }, Optional.empty(), - nCopies(blockStarts.size(), Optional.empty()), Optional.empty()); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java index ef443a65e98d..375e615fd5d6 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java @@ -22,7 +22,6 @@ import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInputFile; import io.trino.memory.context.AggregatedMemoryContext; -import io.trino.parquet.BloomFilterStore; import io.trino.parquet.Column; import io.trino.parquet.Field; import io.trino.parquet.ParquetCorruptionException; @@ -33,6 +32,7 @@ import io.trino.parquet.predicate.TupleDomainParquetPredicate; import io.trino.parquet.reader.MetadataReader; import io.trino.parquet.reader.ParquetReader; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.plugin.hive.AcidInfo; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.HiveColumnHandle; @@ -49,10 +49,8 @@ import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; import org.apache.parquet.io.MessageColumnIO; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; @@ -71,15 +69,13 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; -import static io.trino.parquet.BloomFilterStore.getBloomFilterStore; import static io.trino.parquet.ParquetTypeUtils.constructField; import static io.trino.parquet.ParquetTypeUtils.getColumnIO; import static io.trino.parquet.ParquetTypeUtils.getDescriptors; import static io.trino.parquet.ParquetTypeUtils.getParquetTypeByName; import static io.trino.parquet.ParquetTypeUtils.lookupColumnByName; import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; -import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; -import static io.trino.parquet.reader.TrinoColumnIndexStore.getColumnIndexStore; +import static io.trino.parquet.predicate.PredicateUtils.getFilteredRowGroups; import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; @@ -249,36 +245,17 @@ public static ReaderPageSource createPageSource( parquetPredicates = parquetPredicatesBuilder.build(); } - long nextStart = 0; - ImmutableList.Builder blocks = ImmutableList.builder(); - ImmutableList.Builder blockStarts = ImmutableList.builder(); - ImmutableList.Builder> columnIndexes = ImmutableList.builder(); - for (BlockMetaData block : parquetMetadata.getBlocks()) { - long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); - for (int i = 0; i < parquetTupleDomains.size(); i++) { - TupleDomain parquetTupleDomain = parquetTupleDomains.get(i); - TupleDomainParquetPredicate parquetPredicate = parquetPredicates.get(i); - Optional columnIndex = getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, options); - Optional bloomFilterStore = getBloomFilterStore(dataSource, block, parquetTupleDomain, options); - if (start <= firstDataPage && firstDataPage < start + length - && predicateMatches( - parquetPredicate, - block, - dataSource, - descriptorsByPath, - parquetTupleDomain, - columnIndex, - bloomFilterStore, - timeZone, - domainCompactionThreshold)) { - blocks.add(block); - blockStarts.add(nextStart); - columnIndexes.add(columnIndex); - break; - } - } - nextStart += block.getRowCount(); - } + List rowGroups = getFilteredRowGroups( + start, + length, + dataSource, + parquetMetadata.getBlocks(), + parquetTupleDomains, + parquetPredicates, + descriptorsByPath, + timeZone, + domainCompactionThreshold, + options); Optional readerProjections = projectBaseColumns(columns, useColumnNames); List baseColumns = readerProjections.map(projection -> @@ -292,8 +269,7 @@ && predicateMatches( ParquetReaderProvider parquetReaderProvider = fields -> new ParquetReader( Optional.ofNullable(fileMetaData.getCreatedBy()), fields, - blocks.build(), - blockStarts.build(), + rowGroups, finalDataSource, timeZone, memoryContext, @@ -302,7 +278,6 @@ && predicateMatches( // We avoid using disjuncts of parquetPredicate for page pruning in ParquetReader as currently column indexes // are not present in the Parquet files which are read with disjunct predicates. parquetPredicates.size() == 1 ? Optional.of(parquetPredicates.get(0)) : Optional.empty(), - columnIndexes.build(), parquetWriteValidation); ConnectorPageSource parquetPageSource = createParquetPageSource(baseColumns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider); return new ReaderPageSource(parquetPageSource, readerProjections); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java index 9c29179861c7..080c51746d49 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java @@ -27,6 +27,7 @@ import io.trino.parquet.predicate.TupleDomainParquetPredicate; import io.trino.parquet.reader.MetadataReader; import io.trino.parquet.reader.ParquetReader; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HivePartitionKey; @@ -47,10 +48,8 @@ import io.trino.spi.type.Decimals; import io.trino.spi.type.TypeSignature; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; import org.apache.parquet.io.MessageColumnIO; import org.apache.parquet.schema.MessageType; import org.joda.time.DateTimeZone; @@ -74,8 +73,7 @@ import static io.trino.parquet.ParquetTypeUtils.getColumnIO; import static io.trino.parquet.ParquetTypeUtils.getDescriptors; import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; -import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; -import static io.trino.parquet.reader.TrinoColumnIndexStore.getColumnIndexStore; +import static io.trino.parquet.predicate.PredicateUtils.getFilteredRowGroups; import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.ParquetReaderProvider; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.createDataSource; @@ -214,21 +212,17 @@ private static ConnectorPageSource createPageSource( TupleDomainParquetPredicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, timeZone); - long nextStart = 0; - ImmutableList.Builder blocks = ImmutableList.builder(); - ImmutableList.Builder blockStarts = ImmutableList.builder(); - ImmutableList.Builder> columnIndexes = ImmutableList.builder(); - for (BlockMetaData block : parquetMetadata.getBlocks()) { - long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); - Optional columnIndex = getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, options); - if (start <= firstDataPage && firstDataPage < start + length - && predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, columnIndex, Optional.empty(), timeZone, DOMAIN_COMPACTION_THRESHOLD)) { - blocks.add(block); - blockStarts.add(nextStart); - columnIndexes.add(columnIndex); - } - nextStart += block.getRowCount(); - } + List rowGroups = getFilteredRowGroups( + start, + length, + dataSource, + parquetMetadata.getBlocks(), + ImmutableList.of(parquetTupleDomain), + ImmutableList.of(parquetPredicate), + descriptorsByPath, + timeZone, + DOMAIN_COMPACTION_THRESHOLD, + options); Optional readerProjections = projectBaseColumns(columns); List baseColumns = readerProjections.map(projection -> @@ -241,15 +235,13 @@ && predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parq ParquetReaderProvider parquetReaderProvider = fields -> new ParquetReader( Optional.ofNullable(fileMetaData.getCreatedBy()), fields, - blocks.build(), - blockStarts.build(), + rowGroups, finalDataSource, timeZone, memoryContext, options, exception -> handleException(dataSourceId, exception), Optional.of(parquetPredicate), - columnIndexes.build(), Optional.empty()); return createParquetPageSource(baseColumns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index cd49d7f11366..19bde2a57078 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -38,7 +38,6 @@ import io.trino.orc.OrcRecordReader; import io.trino.orc.TupleDomainOrcPredicate; import io.trino.orc.TupleDomainOrcPredicate.TupleDomainOrcPredicateBuilder; -import io.trino.parquet.BloomFilterStore; import io.trino.parquet.Column; import io.trino.parquet.Field; import io.trino.parquet.ParquetCorruptionException; @@ -48,6 +47,7 @@ import io.trino.parquet.predicate.TupleDomainParquetPredicate; import io.trino.parquet.reader.MetadataReader; import io.trino.parquet.reader.ParquetReader; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.ReaderColumns; import io.trino.plugin.hive.ReaderPageSource; @@ -109,7 +109,6 @@ import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.StructProjection; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.ColumnIO; @@ -145,11 +144,10 @@ import static io.trino.orc.OrcReader.INITIAL_BATCH_SIZE; import static io.trino.orc.OrcReader.ProjectedLayout; import static io.trino.orc.OrcReader.fullyProjectedLayout; -import static io.trino.parquet.BloomFilterStore.getBloomFilterStore; import static io.trino.parquet.ParquetTypeUtils.getColumnIO; import static io.trino.parquet.ParquetTypeUtils.getDescriptors; import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; -import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; +import static io.trino.parquet.predicate.PredicateUtils.getFilteredRowGroups; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.createDataSource; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_PARTITION_DATA; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_PARTITION_SPEC_ID; @@ -586,7 +584,9 @@ public ReaderPageSourceWithRowPositions createDataPageSource( .withMaxReadBlockSize(getParquetMaxReadBlockSize(session)) .withMaxReadBlockRowCount(getParquetMaxReadBlockRowCount(session)) .withSmallFileThreshold(getParquetSmallFileThreshold(session)) - .withBloomFilter(useParquetBloomFilter(session)), + .withBloomFilter(useParquetBloomFilter(session)) + // TODO https://github.com/trinodb/trino/issues/11000 + .withUseColumnIndex(false), predicate, fileFormatDataSourceStats, nameMapping, @@ -960,25 +960,23 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( TupleDomain parquetTupleDomain = getParquetTupleDomain(descriptorsByPath, effectivePredicate); TupleDomainParquetPredicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, UTC); - long nextStart = 0; + List rowGroups = getFilteredRowGroups( + start, + length, + dataSource, + parquetMetadata.getBlocks(), + ImmutableList.of(parquetTupleDomain), + ImmutableList.of(parquetPredicate), + descriptorsByPath, + UTC, + ICEBERG_DOMAIN_COMPACTION_THRESHOLD, + options); Optional startRowPosition = Optional.empty(); Optional endRowPosition = Optional.empty(); - ImmutableList.Builder blockStarts = ImmutableList.builder(); - List blocks = new ArrayList<>(); - for (BlockMetaData block : parquetMetadata.getBlocks()) { - long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); - Optional bloomFilterStore = getBloomFilterStore(dataSource, block, parquetTupleDomain, options); - - if (start <= firstDataPage && firstDataPage < start + length && - predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, Optional.empty(), bloomFilterStore, UTC, ICEBERG_DOMAIN_COMPACTION_THRESHOLD)) { - blocks.add(block); - blockStarts.add(nextStart); - if (startRowPosition.isEmpty()) { - startRowPosition = Optional.of(nextStart); - } - endRowPosition = Optional.of(nextStart + block.getRowCount()); - } - nextStart += block.getRowCount(); + if (!rowGroups.isEmpty()) { + startRowPosition = Optional.of(rowGroups.get(0).fileRowOffset()); + RowGroupInfo lastRowGroup = rowGroups.get(rowGroups.size() - 1); + endRowPosition = Optional.of(lastRowGroup.fileRowOffset() + lastRowGroup.blockMetaData().getRowCount()); } MessageColumnIO messageColumnIO = getColumnIO(fileSchema, requestedSchema); @@ -1041,13 +1039,14 @@ else if (column.getId() == TRINO_MERGE_PARTITION_DATA) { ParquetReader parquetReader = new ParquetReader( Optional.ofNullable(fileMetaData.getCreatedBy()), parquetColumnFieldsBuilder.build(), - blocks, - blockStarts.build(), + rowGroups, dataSource, UTC, memoryContext, options, - exception -> handleException(dataSourceId, exception)); + exception -> handleException(dataSourceId, exception), + Optional.empty(), + Optional.empty()); return new ReaderPageSourceWithRowPositions( new ReaderPageSource( pageSourceBuilder.build(parquetReader),