diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/DictionaryDescriptor.java b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/DictionaryDescriptor.java index 6c1d00042d59..ea5ccef5591a 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/DictionaryDescriptor.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/DictionaryDescriptor.java @@ -21,11 +21,13 @@ public class DictionaryDescriptor { private final ColumnDescriptor columnDescriptor; + private final boolean nullAllowed; private final Optional dictionaryPage; - public DictionaryDescriptor(ColumnDescriptor columnDescriptor, Optional dictionaryPage) + public DictionaryDescriptor(ColumnDescriptor columnDescriptor, boolean nullAllowed, Optional dictionaryPage) { this.columnDescriptor = columnDescriptor; + this.nullAllowed = nullAllowed; this.dictionaryPage = dictionaryPage; } @@ -34,6 +36,11 @@ public ColumnDescriptor getColumnDescriptor() return columnDescriptor; } + public boolean isNullAllowed() + { + return nullAllowed; + } + public Optional getDictionaryPage() { return dictionaryPage; diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/Predicate.java b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/Predicate.java index 413c2f6faeb4..1b0354303184 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/Predicate.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/Predicate.java @@ -21,20 +21,28 @@ import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; import org.joda.time.DateTimeZone; +import java.util.List; import java.util.Map; import java.util.Optional; public interface Predicate { /** - * Should the Parquet Reader process a file section with the specified statistics. + * Should the Parquet Reader process a file section with the specified statistics, + * and if it should, then return the columns are candidates for further inspection of more + * granular statistics from column index and dictionary. * * @param numberOfRows the number of rows in the segment; this can be used with * Statistics to determine if a column is only null * @param statistics column statistics * @param id Parquet file name + * + * @return Optional.empty() if statistics were sufficient to eliminate the file section. + * Otherwise, a list of columns for which page-level indices and dictionary could be consulted + * to potentially eliminate the file section. An optional with empty list is returned if there is + * going to be no benefit in looking at column index or dictionary for any column. */ - boolean matches(long numberOfRows, Map> statistics, ParquetDataSourceId id) + Optional> getIndexLookupCandidates(long numberOfRows, Map> statistics, ParquetDataSourceId id) throws ParquetCorruptionException; /** diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java index e40ecda99835..858e79220d6a 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java @@ -36,12 +36,14 @@ import org.apache.parquet.format.Util; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; +import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.schema.MessageType; import org.joda.time.DateTimeZone; import java.io.IOException; +import java.io.UncheckedIOException; import java.math.BigDecimal; import java.util.Arrays; import java.util.List; @@ -49,7 +51,6 @@ import java.util.Optional; import java.util.Set; -import static com.google.common.base.Verify.verify; import static io.trino.parquet.ParquetCompressionUtils.decompress; import static io.trino.parquet.ParquetTypeUtils.getParquetEncoding; import static io.trino.spi.type.BigintType.BIGINT; @@ -59,12 +60,20 @@ import static io.trino.spi.type.TinyintType.TINYINT; import static java.lang.Math.toIntExact; import static java.lang.String.format; +import static java.util.Objects.requireNonNull; import static org.apache.parquet.column.Encoding.BIT_PACKED; import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY; import static org.apache.parquet.column.Encoding.RLE; public final class PredicateUtils { + // Maximum size of dictionary that we will read for row-group pruning. + // Reading larger dictionaries is typically not beneficial. Before checking + // the dictionary, the row-group and page indexes have already been checked + // and when the dictionary does not eliminate a row-group, the work done to + // decode the dictionary and match it with predicates is wasted. + private static final int MAX_DICTIONARY_SIZE = 8096; + private PredicateUtils() {} public static boolean isStatisticsOverflow(Type type, long min, long max) @@ -120,26 +129,41 @@ public static Predicate buildPredicate( return new TupleDomainParquetPredicate(parquetTupleDomain, columnReferences.build(), timeZone); } - public static boolean predicateMatches(Predicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, Map, ColumnDescriptor> descriptorsByPath, TupleDomain parquetTupleDomain) - throws IOException - { - return predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, Optional.empty()); - } - - public static boolean predicateMatches(Predicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, Map, ColumnDescriptor> descriptorsByPath, TupleDomain parquetTupleDomain, Optional columnIndexStore) + public static boolean predicateMatches( + Predicate parquetPredicate, + BlockMetaData block, + ParquetDataSource dataSource, + Map, ColumnDescriptor> descriptorsByPath, + TupleDomain parquetTupleDomain, + Optional columnIndexStore, + DateTimeZone timeZone) throws IOException { Map> columnStatistics = getStatistics(block, descriptorsByPath); - if (!parquetPredicate.matches(block.getRowCount(), columnStatistics, dataSource.getId())) { + Optional> candidateColumns = parquetPredicate.getIndexLookupCandidates(block.getRowCount(), columnStatistics, dataSource.getId()); + if (candidateColumns.isEmpty()) { return false; } + if (candidateColumns.get().isEmpty()) { + return true; + } + // Perform column index and dictionary lookups only for the subset of columns where it can be useful. + // This prevents unnecessary filesystem reads and decoding work when the predicate on a column comes from + // file-level min/max stats or more generally when the predicate selects a range equal to or wider than row-group min/max. + Predicate indexPredicate = new TupleDomainParquetPredicate(parquetTupleDomain, candidateColumns.get(), timeZone); // Page stats is finer grained but relatively more expensive, so we do the filtering after above block filtering. - if (columnIndexStore.isPresent() && !parquetPredicate.matches(block.getRowCount(), columnIndexStore.get(), dataSource.getId())) { + if (columnIndexStore.isPresent() && !indexPredicate.matches(block.getRowCount(), columnIndexStore.get(), dataSource.getId())) { return false; } - return dictionaryPredicatesMatch(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain); + return dictionaryPredicatesMatch( + indexPredicate, + block, + dataSource, + descriptorsByPath, + ImmutableSet.copyOf(candidateColumns.get()), + columnIndexStore); } private static Map> getStatistics(BlockMetaData blockMetadata, Map, ColumnDescriptor> descriptorsByPath) @@ -157,50 +181,115 @@ private static Map> getStatistics(BlockMetaData return statistics.buildOrThrow(); } - private static boolean dictionaryPredicatesMatch(Predicate parquetPredicate, BlockMetaData blockMetadata, ParquetDataSource dataSource, Map, ColumnDescriptor> descriptorsByPath, TupleDomain parquetTupleDomain) + private static boolean dictionaryPredicatesMatch( + Predicate parquetPredicate, + BlockMetaData blockMetadata, + ParquetDataSource dataSource, + Map, ColumnDescriptor> descriptorsByPath, + Set candidateColumns, + Optional columnIndexStore) throws IOException { for (ColumnChunkMetaData columnMetaData : blockMetadata.getColumns()) { ColumnDescriptor descriptor = descriptorsByPath.get(Arrays.asList(columnMetaData.getPath().toArray())); - if (descriptor != null) { - if (isOnlyDictionaryEncodingPages(columnMetaData) && isColumnPredicate(descriptor, parquetTupleDomain)) { - Slice buffer = dataSource.readFully(columnMetaData.getStartingPos(), toIntExact(columnMetaData.getTotalSize())); - // Early abort, predicate already filters block so no more dictionaries need be read - if (!parquetPredicate.matches(new DictionaryDescriptor(descriptor, readDictionaryPage(buffer, columnMetaData.getCodec())))) { - return false; - } + if (descriptor == null || !candidateColumns.contains(descriptor)) { + continue; + } + if (isOnlyDictionaryEncodingPages(columnMetaData)) { + Statistics columnStatistics = columnMetaData.getStatistics(); + boolean nullAllowed = columnStatistics == null || columnStatistics.getNumNulls() != 0; + // Early abort, predicate already filters block so no more dictionaries need be read + if (!parquetPredicate.matches(new DictionaryDescriptor( + descriptor, + nullAllowed, + readDictionaryPage(dataSource, columnMetaData, columnIndexStore)))) { + return false; } } } return true; } - private static Optional readDictionaryPage(Slice data, CompressionCodecName codecName) + private static Optional readDictionaryPage( + ParquetDataSource dataSource, + ColumnChunkMetaData columnMetaData, + Optional columnIndexStore) + throws IOException { - try { - SliceInput inputStream = data.getInput(); - PageHeader pageHeader = Util.readPageHeader(inputStream); + int dictionaryPageSize; + if (columnMetaData.getDictionaryPageOffset() == 0 || columnMetaData.getFirstDataPageOffset() <= columnMetaData.getDictionaryPageOffset()) { + /* + * See org.apache.parquet.hadoop.Offsets for reference. + * The offsets might not contain the proper values in the below cases: + * - The dictionaryPageOffset might not be set; in this case 0 is returned + * (0 cannot be a valid offset because of the MAGIC bytes) + * - The firstDataPageOffset might point to the dictionary page + * + * Such parquet files may have been produced by parquet-mr writers before PARQUET-1977. + * We find the dictionary page size from OffsetIndex if that exists, + * otherwise fallback to reading the full column chunk. + */ + dictionaryPageSize = columnIndexStore.flatMap(index -> getDictionaryPageSize(index, columnMetaData)) + .orElseGet(() -> toIntExact(columnMetaData.getTotalSize())); + } + else { + dictionaryPageSize = toIntExact(columnMetaData.getFirstDataPageOffset() - columnMetaData.getDictionaryPageOffset()); + } + // Get the dictionary page header and the dictionary in single read + Slice buffer = dataSource.readFully(columnMetaData.getStartingPos(), dictionaryPageSize); + return readPageHeaderWithData(buffer.getInput()).map(data -> decodeDictionaryPage(data, columnMetaData)); + } - if (pageHeader.type != PageType.DICTIONARY_PAGE) { - return Optional.empty(); - } + private static Optional getDictionaryPageSize(ColumnIndexStore columnIndexStore, ColumnChunkMetaData columnMetaData) + { + OffsetIndex offsetIndex = columnIndexStore.getOffsetIndex(columnMetaData.getPath()); + if (offsetIndex == null) { + return Optional.empty(); + } + long rowGroupOffset = columnMetaData.getStartingPos(); + long firstPageOffset = offsetIndex.getOffset(0); + if (rowGroupOffset < firstPageOffset) { + return Optional.of(toIntExact(firstPageOffset - rowGroupOffset)); + } + return Optional.empty(); + } - Slice compressedData = inputStream.readSlice(pageHeader.getCompressed_page_size()); - DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header(); - ParquetEncoding encoding = getParquetEncoding(Encoding.valueOf(dicHeader.getEncoding().name())); - int dictionarySize = dicHeader.getNum_values(); + private static Optional readPageHeaderWithData(SliceInput inputStream) + { + PageHeader pageHeader; + try { + pageHeader = Util.readPageHeader(inputStream); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } - return Optional.of(new DictionaryPage(decompress(codecName, compressedData, pageHeader.getUncompressed_page_size()), dictionarySize, encoding)); + if (pageHeader.type != PageType.DICTIONARY_PAGE) { + return Optional.empty(); } - catch (IOException ignored) { + DictionaryPageHeader dictionaryHeader = pageHeader.getDictionary_page_header(); + if (dictionaryHeader.getNum_values() > MAX_DICTIONARY_SIZE) { return Optional.empty(); } + return Optional.of(new PageHeaderWithData( + pageHeader, + inputStream.readSlice(pageHeader.getCompressed_page_size()))); } - private static boolean isColumnPredicate(ColumnDescriptor columnDescriptor, TupleDomain parquetTupleDomain) + private static DictionaryPage decodeDictionaryPage(PageHeaderWithData pageHeaderWithData, ColumnChunkMetaData chunkMetaData) { - verify(parquetTupleDomain.getDomains().isPresent(), "parquetTupleDomain is empty"); - return parquetTupleDomain.getDomains().get().containsKey(columnDescriptor); + PageHeader pageHeader = pageHeaderWithData.pageHeader(); + DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header(); + ParquetEncoding encoding = getParquetEncoding(Encoding.valueOf(dicHeader.getEncoding().name())); + int dictionarySize = dicHeader.getNum_values(); + + Slice compressedData = pageHeaderWithData.compressedData(); + try { + return new DictionaryPage(decompress(chunkMetaData.getCodec(), compressedData, pageHeader.getUncompressed_page_size()), dictionarySize, encoding); + } + catch (IOException e) { + throw new ParquetDecodingException("Could not decode the dictionary for " + chunkMetaData.getPath(), e); + } } @VisibleForTesting @@ -224,4 +313,13 @@ static boolean isOnlyDictionaryEncodingPages(ColumnChunkMetaData columnMetaData) return false; } + + private record PageHeaderWithData(PageHeader pageHeader, Slice compressedData) + { + private PageHeaderWithData(PageHeader pageHeader, Slice compressedData) + { + this.pageHeader = requireNonNull(pageHeader, "pageHeader is null"); + this.compressedData = requireNonNull(compressedData, "compressedData is null"); + } + } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java index 4a226ce79689..f691b9fc457a 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java @@ -91,18 +91,19 @@ public TupleDomainParquetPredicate(TupleDomain effectivePredic } @Override - public boolean matches(long numberOfRows, Map> statistics, ParquetDataSourceId id) + public Optional> getIndexLookupCandidates(long numberOfRows, Map> statistics, ParquetDataSourceId id) throws ParquetCorruptionException { if (numberOfRows == 0) { - return false; + return Optional.empty(); } if (effectivePredicate.isNone()) { - return false; + return Optional.empty(); } Map effectivePredicateDomains = effectivePredicate.getDomains() .orElseThrow(() -> new IllegalStateException("Effective predicate other than none should have domains")); + ImmutableList.Builder candidateColumns = ImmutableList.builder(); for (ColumnDescriptor column : columns) { Domain effectivePredicateDomain = effectivePredicateDomains.get(column); if (effectivePredicateDomain == null) { @@ -112,6 +113,7 @@ public boolean matches(long numberOfRows, Map> s Statistics columnStatistics = statistics.get(column); if (columnStatistics == null || columnStatistics.isEmpty()) { // no stats for column + candidateColumns.add(column); continue; } @@ -123,10 +125,15 @@ public boolean matches(long numberOfRows, Map> s id, timeZone); if (!effectivePredicateDomain.overlaps(domain)) { - return false; + return Optional.empty(); + } + // If the predicate domain on a column includes the entire domain from column row-group statistics, + // then more granular statistics from page stats or dictionary for this column will not help to eliminate the row-group. + if (!effectivePredicateDomain.contains(domain)) { + candidateColumns.add(column); } } - return true; + return Optional.of(candidateColumns.build()); } @Override @@ -261,7 +268,7 @@ private static Domain getDomain( } if (type.equals(BIGINT) || type.equals(INTEGER) || type.equals(DATE) || type.equals(SMALLINT) || type.equals(TINYINT)) { - List ranges = new ArrayList<>(); + List ranges = new ArrayList<>(minimums.size()); for (int i = 0; i < minimums.size(); i++) { long min = asLong(minimums.get(i)); long max = asLong(maximums.get(i)); @@ -277,7 +284,7 @@ private static Domain getDomain( if (type instanceof DecimalType) { DecimalType decimalType = (DecimalType) type; - List ranges = new ArrayList<>(); + List ranges = new ArrayList<>(minimums.size()); if (decimalType.isShort()) { for (int i = 0; i < minimums.size(); i++) { Object min = minimums.get(i); @@ -306,7 +313,7 @@ private static Domain getDomain( } if (type.equals(REAL)) { - List ranges = new ArrayList<>(); + List ranges = new ArrayList<>(minimums.size()); for (int i = 0; i < minimums.size(); i++) { Float min = (Float) minimums.get(i); Float max = (Float) maximums.get(i); @@ -321,7 +328,7 @@ private static Domain getDomain( } if (type.equals(DOUBLE)) { - List ranges = new ArrayList<>(); + List ranges = new ArrayList<>(minimums.size()); for (int i = 0; i < minimums.size(); i++) { Double min = (Double) minimums.get(i); Double max = (Double) maximums.get(i); @@ -336,7 +343,7 @@ private static Domain getDomain( } if (type instanceof VarcharType) { - List ranges = new ArrayList<>(); + List ranges = new ArrayList<>(minimums.size()); for (int i = 0; i < minimums.size(); i++) { Slice min = Slices.wrappedBuffer(((Binary) minimums.get(i)).toByteBuffer()); Slice max = Slices.wrappedBuffer(((Binary) maximums.get(i)).toByteBuffer()); @@ -348,7 +355,7 @@ private static Domain getDomain( if (type instanceof TimestampType) { if (column.getPrimitiveType().getPrimitiveTypeName().equals(INT96)) { TrinoTimestampEncoder timestampEncoder = createTimestampEncoder((TimestampType) type, timeZone); - List values = new ArrayList<>(); + List values = new ArrayList<>(minimums.size()); for (int i = 0; i < minimums.size(); i++) { Object min = minimums.get(i); Object max = maximums.get(i); @@ -379,7 +386,7 @@ private static Domain getDomain( } TrinoTimestampEncoder timestampEncoder = createTimestampEncoder((TimestampType) type, DateTimeZone.UTC); - List ranges = new ArrayList<>(); + List ranges = new ArrayList<>(minimums.size()); for (int i = 0; i < minimums.size(); i++) { long min = (long) minimums.get(i); long max = (long) maximums.get(i); @@ -438,8 +445,8 @@ public static Domain getDomain( int pageCount = minValues.size(); ColumnIndexValueConverter converter = new ColumnIndexValueConverter(); Function converterFunction = converter.getConverter(descriptor.getPrimitiveType()); - List min = new ArrayList<>(); - List max = new ArrayList<>(); + List min = new ArrayList<>(pageCount); + List max = new ArrayList<>(pageCount); for (int i = 0; i < pageCount; i++) { if (nullPages.get(i)) { continue; @@ -486,18 +493,21 @@ private static Domain getDomain(Type type, DictionaryDescriptor dictionaryDescri int dictionarySize = dictionaryPage.get().getDictionarySize(); if (dictionarySize == 0) { - return Domain.onlyNull(type); + if (dictionaryDescriptor.isNullAllowed()) { + return Domain.onlyNull(type); + } + return Domain.none(type); } DictionaryValueConverter converter = new DictionaryValueConverter(dictionary); Function convertFunction = converter.getConverter(columnDescriptor.getPrimitiveType()); - List values = new ArrayList<>(); + List values = new ArrayList<>(dictionarySize); for (int i = 0; i < dictionarySize; i++) { values.add(convertFunction.apply(i)); } // TODO: when min == max (i.e., singleton ranges, the construction of Domains can be done more efficiently - return getDomain(columnDescriptor, type, values, values, true, timeZone); + return getDomain(columnDescriptor, type, values, values, dictionaryDescriptor.isNullAllowed(), timeZone); } private static ParquetCorruptionException corruptionException(String column, ParquetDataSourceId id, Statistics statistics, Exception cause) diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/TestTupleDomainParquetPredicate.java b/lib/trino-parquet/src/test/java/io/trino/parquet/TestTupleDomainParquetPredicate.java index d194f3066a87..01a8f2bae0b9 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/TestTupleDomainParquetPredicate.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/TestTupleDomainParquetPredicate.java @@ -13,6 +13,7 @@ */ package io.trino.parquet; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.math.LongMath; import io.airlift.slice.Slice; @@ -554,7 +555,8 @@ public void testVarcharMatchesWithStatistics() .withMax(value.getBytes(UTF_8)) .withNumNulls(1L) .build(); - assertTrue(parquetPredicate.matches(2, ImmutableMap.of(column, stats), ID)); + assertThat(parquetPredicate.getIndexLookupCandidates(2, ImmutableMap.of(column, stats), ID)) + .isEqualTo(Optional.of(ImmutableList.of(column))); } @Test(dataProvider = "typeForParquetInt32") @@ -567,9 +569,11 @@ public void testIntegerMatchesWithStatistics(Type typeForParquetInt32) Domain.create(ValueSet.of(typeForParquetInt32, 42L, 43L, 44L, 112L), false))); TupleDomainParquetPredicate parquetPredicate = new TupleDomainParquetPredicate(effectivePredicate, singletonList(column), UTC); - assertTrue(parquetPredicate.matches(2, ImmutableMap.of(column, intColumnStats(32, 42)), ID)); - assertFalse(parquetPredicate.matches(2, ImmutableMap.of(column, intColumnStats(30, 40)), ID)); - assertEquals(parquetPredicate.matches(2, ImmutableMap.of(column, intColumnStats(1024, 0x10000 + 42)), ID), (typeForParquetInt32 != INTEGER)); // stats invalid for smallint/tinyint + assertThat(parquetPredicate.getIndexLookupCandidates(2, ImmutableMap.of(column, intColumnStats(32, 42)), ID)) + .isEqualTo(Optional.of(ImmutableList.of(column))); + assertThat(parquetPredicate.getIndexLookupCandidates(2, ImmutableMap.of(column, intColumnStats(30, 40)), ID)).isEmpty(); + assertThat(parquetPredicate.getIndexLookupCandidates(2, ImmutableMap.of(column, intColumnStats(1024, 0x10000 + 42)), ID).isPresent()) + .isEqualTo(typeForParquetInt32 != INTEGER); // stats invalid for smallint/tinyint } @DataProvider @@ -592,9 +596,10 @@ public void testBigintMatchesWithStatistics() Domain.create(ValueSet.of(BIGINT, 42L, 43L, 44L, 404L), false))); TupleDomainParquetPredicate parquetPredicate = new TupleDomainParquetPredicate(effectivePredicate, singletonList(column), UTC); - assertTrue(parquetPredicate.matches(2, ImmutableMap.of(column, longColumnStats(32, 42)), ID)); - assertFalse(parquetPredicate.matches(2, ImmutableMap.of(column, longColumnStats(30, 40)), ID)); - assertFalse(parquetPredicate.matches(2, ImmutableMap.of(column, longColumnStats(1024, 0x10000 + 42)), ID)); + assertThat(parquetPredicate.getIndexLookupCandidates(2, ImmutableMap.of(column, longColumnStats(32, 42)), ID)) + .isEqualTo(Optional.of(ImmutableList.of(column))); + assertThat(parquetPredicate.getIndexLookupCandidates(2, ImmutableMap.of(column, longColumnStats(30, 40)), ID)).isEmpty(); + assertThat(parquetPredicate.getIndexLookupCandidates(2, ImmutableMap.of(column, longColumnStats(1024, 0x10000 + 42)), ID)).isEmpty(); } @Test @@ -604,7 +609,15 @@ public void testVarcharMatchesWithDictionaryDescriptor() TupleDomain effectivePredicate = getEffectivePredicate(column, createVarcharType(255), EMPTY_SLICE); TupleDomainParquetPredicate parquetPredicate = new TupleDomainParquetPredicate(effectivePredicate, singletonList(column), UTC); DictionaryPage page = new DictionaryPage(Slices.wrappedBuffer(new byte[] {0, 0, 0, 0}), 1, PLAIN_DICTIONARY); - assertTrue(parquetPredicate.matches(new DictionaryDescriptor(column, Optional.of(page)))); + assertTrue(parquetPredicate.matches(new DictionaryDescriptor(column, true, Optional.of(page)))); + assertTrue(parquetPredicate.matches(new DictionaryDescriptor(column, false, Optional.of(page)))); + + effectivePredicate = withColumnDomains(ImmutableMap.of( + column, + singleValue(createVarcharType(255), Slices.utf8Slice("abc"), true))); + parquetPredicate = new TupleDomainParquetPredicate(effectivePredicate, singletonList(column), UTC); + assertTrue(parquetPredicate.matches(new DictionaryDescriptor(column, true, Optional.of(page)))); + assertFalse(parquetPredicate.matches(new DictionaryDescriptor(column, false, Optional.of(page)))); } @Test @@ -622,21 +635,59 @@ public void testEmptyDictionary() withColumnDomains(singletonMap(descriptor, notNull(type))), singletonList(column), UTC); - assertFalse(predicate.matches(new DictionaryDescriptor(column, Optional.of(dictionary)))); + assertFalse(predicate.matches(new DictionaryDescriptor(column, true, Optional.of(dictionary)))); + assertFalse(predicate.matches(new DictionaryDescriptor(column, false, Optional.of(dictionary)))); // only nulls allowed predicate = new TupleDomainParquetPredicate( withColumnDomains(singletonMap(descriptor, onlyNull(type))), singletonList(column), UTC); - assertTrue(predicate.matches(new DictionaryDescriptor(column, Optional.of(dictionary)))); + assertTrue(predicate.matches(new DictionaryDescriptor(column, true, Optional.of(dictionary)))); + assertFalse(predicate.matches(new DictionaryDescriptor(column, false, Optional.of(dictionary)))); // mixed non-nulls and nulls allowed predicate = new TupleDomainParquetPredicate( withColumnDomains(singletonMap(descriptor, singleValue(type, EMPTY_SLICE, true))), singletonList(column), UTC); - assertTrue(predicate.matches(new DictionaryDescriptor(column, Optional.of(dictionary)))); + assertTrue(predicate.matches(new DictionaryDescriptor(column, true, Optional.of(dictionary)))); + assertFalse(predicate.matches(new DictionaryDescriptor(column, false, Optional.of(dictionary)))); + } + + @Test + public void testIndexLookupCandidates() + throws ParquetCorruptionException + { + ColumnDescriptor columnA = new ColumnDescriptor(new String[] {"pathA"}, Types.optional(INT64).named("Test column A"), 0, 0); + ColumnDescriptor columnB = new ColumnDescriptor(new String[] {"pathB"}, Types.optional(INT64).named("Test column B"), 0, 0); + TupleDomain effectivePredicate = TupleDomain.withColumnDomains(ImmutableMap.of( + columnA, + Domain.create(ValueSet.of(BIGINT, 42L, 43L, 44L, 404L), false), + columnB, + Domain.create(ValueSet.ofRanges(range(BIGINT, 42L, true, 404L, true)), false))); + + TupleDomainParquetPredicate parquetPredicate = new TupleDomainParquetPredicate(effectivePredicate, singletonList(columnA), UTC); + assertThat(parquetPredicate.getIndexLookupCandidates( + 2, + ImmutableMap.of(columnA, longColumnStats(32, 42), columnB, longColumnStats(42, 500)), ID)) + .isEqualTo(Optional.of(ImmutableList.of(columnA))); + + parquetPredicate = new TupleDomainParquetPredicate(effectivePredicate, ImmutableList.of(columnA, columnB), UTC); + // column stats missing on columnB + assertThat(parquetPredicate.getIndexLookupCandidates(2, ImmutableMap.of(columnA, longColumnStats(32, 42)), ID)) + .isEqualTo(Optional.of(ImmutableList.of(columnA, columnB))); + + // All possible values for columnB are covered by effectivePredicate + assertThat(parquetPredicate.getIndexLookupCandidates( + 2, + ImmutableMap.of(columnA, longColumnStats(32, 42), columnB, longColumnStats(50, 400)), ID)) + .isEqualTo(Optional.of(ImmutableList.of(columnA))); + + assertThat(parquetPredicate.getIndexLookupCandidates( + 2, + ImmutableMap.of(columnA, longColumnStats(32, 42), columnB, longColumnStats(42, 500)), ID)) + .isEqualTo(Optional.of(ImmutableList.of(columnA, columnB))); } @Test @@ -751,6 +802,7 @@ private DictionaryDescriptor floatDictionaryDescriptor(float... values) } return new DictionaryDescriptor( new ColumnDescriptor(new String[] {"dummy"}, new PrimitiveType(OPTIONAL, FLOAT, 0, "FloatColumn"), 1, 1), + true, Optional.of(new DictionaryPage(Slices.wrappedBuffer(buf.toByteArray()), values.length, PLAIN_DICTIONARY))); } @@ -765,6 +817,7 @@ private DictionaryDescriptor doubleDictionaryDescriptor(double... values) } return new DictionaryDescriptor( new ColumnDescriptor(new String[] {"dummy"}, new PrimitiveType(OPTIONAL, PrimitiveTypeName.DOUBLE, 0, "DoubleColumn"), 1, 1), + true, Optional.of(new DictionaryPage(Slices.wrappedBuffer(buf.toByteArray()), values.length, PLAIN_DICTIONARY))); } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/predicate/BenchmarkTupleDomainParquetPredicate.java b/lib/trino-parquet/src/test/java/io/trino/parquet/predicate/BenchmarkTupleDomainParquetPredicate.java index 2e0a192e2f5d..13f8df3cff3d 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/predicate/BenchmarkTupleDomainParquetPredicate.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/predicate/BenchmarkTupleDomainParquetPredicate.java @@ -93,6 +93,7 @@ private DictionaryDescriptor createBigintDictionary() return new DictionaryDescriptor( new ColumnDescriptor(new String[] {"path"}, Types.optional(INT64).named("Test column"), 0, 0), + true, Optional.of( new DictionaryPage( slice, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java index 37eb876d144f..7eb069ac9f6b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java @@ -241,7 +241,7 @@ public static ReaderPageSource createPageSource( long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); Optional columnIndex = getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, options); if (start <= firstDataPage && firstDataPage < start + length - && predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, columnIndex)) { + && predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, columnIndex, timeZone)) { blocks.add(block); blockStarts.add(nextStart); columnIndexes.add(columnIndex); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index bae25dcf0547..38d02f8bc915 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -981,7 +981,7 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( for (BlockMetaData block : parquetMetadata.getBlocks()) { long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); if (start <= firstDataPage && firstDataPage < start + length && - predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain)) { + predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, Optional.empty(), UTC)) { blocks.add(block); blockStarts.add(nextStart); if (startRowPosition.isEmpty()) {