From 1cf888bed1cb49f7f77d92bfa5a53661c3722a77 Mon Sep 17 00:00:00 2001 From: David Phillips Date: Tue, 9 Aug 2022 17:39:12 -0700 Subject: [PATCH 1/3] Remove unused constructor from ParquetPageSource --- .../io/trino/plugin/hive/parquet/ParquetPageSource.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java index de56ec42b302..df4a02ee7444 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java @@ -41,7 +41,6 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CURSOR_ERROR; import static java.lang.String.format; -import static java.util.Collections.nCopies; import static java.util.Objects.requireNonNull; public class ParquetPageSource @@ -60,11 +59,6 @@ public class ParquetPageSource private boolean closed; private long completedPositions; - public ParquetPageSource(ParquetReader parquetReader, List types, List> fields) - { - this(parquetReader, types, nCopies(types.size(), false), fields); - } - /** * @param types Column types * @param rowIndexLocations Whether each column should be populated with the indices of its rows From c3002617920a390eddfc12690a7f1f2382e95905 Mon Sep 17 00:00:00 2001 From: David Phillips Date: Tue, 9 Aug 2022 17:51:45 -0700 Subject: [PATCH 2/3] Move factory method to TrinoColumnIndexStore --- .../parquet/reader/TrinoColumnIndexStore.java | 45 +++++++++++++++++++ .../parquet/ParquetPageSourceFactory.java | 43 +----------------- 2 files changed, 46 insertions(+), 42 deletions(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java index cfbb2e016790..32ebfaf1ffac 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java @@ -21,6 +21,11 @@ import io.trino.parquet.ChunkReader; import io.trino.parquet.DiskRange; import io.trino.parquet.ParquetDataSource; +import io.trino.parquet.ParquetReaderOptions; +import io.trino.parquet.RichColumnDescriptor; +import io.trino.spi.predicate.Domain; +import io.trino.spi.predicate.TupleDomain; +import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.format.Util; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -35,12 +40,15 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.BiFunction; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.getOnlyElement; import static java.util.Objects.requireNonNull; @@ -184,4 +192,41 @@ private PrimitiveType getPrimitiveType() return primitiveType; } } + + public static Optional getColumnIndexStore( + ParquetDataSource dataSource, + BlockMetaData blockMetadata, + Map, RichColumnDescriptor> descriptorsByPath, + TupleDomain parquetTupleDomain, + ParquetReaderOptions options) + { + if (!options.isUseColumnIndex() || parquetTupleDomain.isAll() || parquetTupleDomain.isNone()) { + return Optional.empty(); + } + + boolean hasColumnIndex = false; + for (ColumnChunkMetaData column : blockMetadata.getColumns()) { + if (column.getColumnIndexReference() != null && column.getOffsetIndexReference() != null) { + hasColumnIndex = true; + break; + } + } + + if (!hasColumnIndex) { + return Optional.empty(); + } + + Set columnsReadPaths = new HashSet<>(descriptorsByPath.size()); + for (List path : descriptorsByPath.keySet()) { + columnsReadPaths.add(ColumnPath.get(path.toArray(new String[0]))); + } + + Map parquetDomains = parquetTupleDomain.getDomains() + .orElseThrow(() -> new IllegalStateException("Predicate other than none should have domains")); + Set columnsFilteredPaths = parquetDomains.keySet().stream() + .map(column -> ColumnPath.get(column.getPath())) + .collect(toImmutableSet()); + + return Optional.of(new TrinoColumnIndexStore(dataSource, blockMetadata, columnsReadPaths, columnsFilteredPaths)); + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java index 0e2f44e1e331..94cc5ec39543 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java @@ -25,7 +25,6 @@ import io.trino.parquet.predicate.Predicate; import io.trino.parquet.reader.MetadataReader; import io.trino.parquet.reader.ParquetReader; -import io.trino.parquet.reader.TrinoColumnIndexStore; import io.trino.plugin.hive.AcidInfo; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.HdfsEnvironment; @@ -50,8 +49,6 @@ import org.apache.hadoop.hdfs.BlockMissingException; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.hadoop.metadata.BlockMetaData; -import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; -import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; @@ -64,7 +61,6 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -75,7 +71,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.nullToEmpty; -import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.parquet.ParquetTypeUtils.getColumnIO; import static io.trino.parquet.ParquetTypeUtils.getDescriptors; @@ -83,6 +78,7 @@ import static io.trino.parquet.ParquetTypeUtils.lookupColumnByName; import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; +import static io.trino.parquet.reader.TrinoColumnIndexStore.getColumnIndexStore; import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; @@ -365,43 +361,6 @@ public static Optional getColumnType(HiveColumnH return Optional.of(new GroupType(baseType.getRepetition(), baseType.getName(), ImmutableList.of(type))); } - private static Optional getColumnIndexStore( - ParquetDataSource dataSource, - BlockMetaData blockMetadata, - Map, RichColumnDescriptor> descriptorsByPath, - TupleDomain parquetTupleDomain, - ParquetReaderOptions options) - { - if (!options.isUseColumnIndex() || parquetTupleDomain.isAll() || parquetTupleDomain.isNone()) { - return Optional.empty(); - } - - boolean hasColumnIndex = false; - for (ColumnChunkMetaData column : blockMetadata.getColumns()) { - if (column.getColumnIndexReference() != null && column.getOffsetIndexReference() != null) { - hasColumnIndex = true; - break; - } - } - - if (!hasColumnIndex) { - return Optional.empty(); - } - - Set columnsReadPaths = new HashSet<>(descriptorsByPath.size()); - for (List path : descriptorsByPath.keySet()) { - columnsReadPaths.add(ColumnPath.get(path.toArray(new String[0]))); - } - - Map parquetDomains = parquetTupleDomain.getDomains() - .orElseThrow(() -> new IllegalStateException("Predicate other than none should have domains")); - Set columnsFilteredPaths = parquetDomains.keySet().stream() - .map(column -> ColumnPath.get(column.getPath())) - .collect(toImmutableSet()); - - return Optional.of(new TrinoColumnIndexStore(dataSource, blockMetadata, columnsReadPaths, columnsFilteredPaths)); - } - public static TupleDomain getParquetTupleDomain( Map, RichColumnDescriptor> descriptorsByPath, TupleDomain effectivePredicate, From 1c32786bf4ff1cba4ffc59da245d48b1890a5e67 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Fri, 24 Jun 2022 18:46:14 +0900 Subject: [PATCH 3/3] Use Parquet column index when reading table in Iceberg --- .../trino/parquet/predicate/PredicateUtils.java | 6 ------ .../io/trino/parquet/reader/ParquetReader.java | 14 -------------- .../plugin/iceberg/IcebergPageSourceProvider.java | 15 ++++++++++++--- .../plugin/iceberg/IcebergSessionProperties.java | 11 +++++++++++ 4 files changed, 23 insertions(+), 23 deletions(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java index 91bc59ee1802..68a153c0c2d4 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java @@ -121,12 +121,6 @@ public static Predicate buildPredicate( return new TupleDomainParquetPredicate(parquetTupleDomain, columnReferences.build(), timeZone); } - public static boolean predicateMatches(Predicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, Map, RichColumnDescriptor> descriptorsByPath, TupleDomain parquetTupleDomain) - throws IOException - { - return predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, Optional.empty()); - } - public static boolean predicateMatches(Predicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, Map, RichColumnDescriptor> descriptorsByPath, TupleDomain parquetTupleDomain, Optional columnIndexStore) throws IOException { diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java index f907cf5dbf01..9ca9f6fd55b4 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java @@ -125,20 +125,6 @@ public class ParquetReader private final Map paths = new HashMap<>(); private final Map> codecMetrics; - public ParquetReader( - Optional fileCreatedBy, - MessageColumnIO messageColumnIO, - List blocks, - List firstRowsOfBlocks, - ParquetDataSource dataSource, - DateTimeZone timeZone, - AggregatedMemoryContext memoryContext, - ParquetReaderOptions options) - throws IOException - { - this(fileCreatedBy, messageColumnIO, blocks, firstRowsOfBlocks, dataSource, timeZone, memoryContext, options, null, null); - } - public ParquetReader( Optional fileCreatedBy, MessageColumnIO messageColumnIO, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 26e0176cf99f..b46df00bbf2b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -111,6 +111,7 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; import org.apache.parquet.io.ColumnIO; import org.apache.parquet.io.MessageColumnIO; import org.apache.parquet.schema.MessageType; @@ -150,6 +151,7 @@ import static io.trino.parquet.ParquetTypeUtils.getDescriptors; import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; +import static io.trino.parquet.reader.TrinoColumnIndexStore.getColumnIndexStore; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_FILE_RECORD_COUNT; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_PARTITION_DATA; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_PARTITION_SPEC_ID; @@ -170,6 +172,7 @@ import static io.trino.plugin.iceberg.IcebergSessionProperties.getParquetMaxReadBlockSize; import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcBloomFiltersEnabled; import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcNestedLazy; +import static io.trino.plugin.iceberg.IcebergSessionProperties.isParquetUseColumnIndex; import static io.trino.plugin.iceberg.IcebergSessionProperties.isUseFileSizeFromMetadata; import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD; import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue; @@ -615,7 +618,8 @@ public ReaderPageSourceWithRowPositions createDataPageSource( partitionData, dataColumns, parquetReaderOptions - .withMaxReadBlockSize(getParquetMaxReadBlockSize(session)), + .withMaxReadBlockSize(getParquetMaxReadBlockSize(session)) + .withUseColumnIndex(isParquetUseColumnIndex(session)), predicate, fileFormatDataSourceStats, nameMapping, @@ -1062,12 +1066,15 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( Optional endRowPosition = Optional.empty(); ImmutableList.Builder blockStarts = ImmutableList.builder(); List blocks = new ArrayList<>(); + ImmutableList.Builder> columnIndexes = ImmutableList.builder(); for (BlockMetaData block : parquetMetadata.getBlocks()) { long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); + Optional columnIndex = getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, options); if (start <= firstDataPage && firstDataPage < start + length && - predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain)) { + predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, columnIndex)) { blocks.add(block); blockStarts.add(nextStart); + columnIndexes.add(columnIndex); if (startRowPosition.isEmpty()) { startRowPosition = Optional.of(nextStart); } @@ -1085,7 +1092,9 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( dataSource, UTC, memoryContext, - options); + options, + parquetPredicate, + columnIndexes.build()); ConstantPopulatingPageSource.Builder constantPopulatingPageSourceBuilder = ConstantPopulatingPageSource.builder(); int parquetSourceChannel = 0; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java index 40b9b1027f59..2ef29ba2ba1c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java @@ -65,6 +65,7 @@ public final class IcebergSessionProperties private static final String ORC_WRITER_MAX_STRIPE_ROWS = "orc_writer_max_stripe_rows"; private static final String ORC_WRITER_MAX_DICTIONARY_MEMORY = "orc_writer_max_dictionary_memory"; private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size"; + private static final String PARQUET_USE_COLUMN_INDEX = "parquet_use_column_index"; private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size"; private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size"; private static final String PARQUET_WRITER_BATCH_SIZE = "parquet_writer_batch_size"; @@ -189,6 +190,11 @@ public IcebergSessionProperties( "Parquet: Maximum size of a block to read", parquetReaderConfig.getMaxReadBlockSize(), false)) + .add(booleanProperty( + PARQUET_USE_COLUMN_INDEX, + "Parquet: Use Parquet column index", + parquetReaderConfig.isUseColumnIndex(), + false)) .add(dataSizeProperty( PARQUET_WRITER_BLOCK_SIZE, "Parquet: Writer block size", @@ -357,6 +363,11 @@ public static DataSize getParquetMaxReadBlockSize(ConnectorSession session) return session.getProperty(PARQUET_MAX_READ_BLOCK_SIZE, DataSize.class); } + public static boolean isParquetUseColumnIndex(ConnectorSession session) + { + return session.getProperty(PARQUET_USE_COLUMN_INDEX, Boolean.class); + } + public static DataSize getParquetWriterPageSize(ConnectorSession session) { return session.getProperty(PARQUET_WRITER_PAGE_SIZE, DataSize.class);