Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
public class DictionaryDescriptor
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the nulls count in column statistics for a column is 0

Do you think it's possible for some writer to set it incorrectly?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If column statistics are incorrect then our existing predicate pushdown logic could also give wrong results, so we should be able to rely on it

{
private final ColumnDescriptor columnDescriptor;
private final boolean nullAllowed;
private final Optional<DictionaryPage> dictionaryPage;

public DictionaryDescriptor(ColumnDescriptor columnDescriptor, Optional<DictionaryPage> dictionaryPage)
public DictionaryDescriptor(ColumnDescriptor columnDescriptor, boolean nullAllowed, Optional<DictionaryPage> dictionaryPage)
{
this.columnDescriptor = columnDescriptor;
this.nullAllowed = nullAllowed;
this.dictionaryPage = dictionaryPage;
}

Expand All @@ -34,6 +36,11 @@ public ColumnDescriptor getColumnDescriptor()
return columnDescriptor;
}

public boolean isNullAllowed()
{
return nullAllowed;
}

public Optional<DictionaryPage> getDictionaryPage()
{
return dictionaryPage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,28 @@
import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;
import org.joda.time.DateTimeZone;

import java.util.List;
import java.util.Map;
import java.util.Optional;

public interface Predicate
{
/**
* Should the Parquet Reader process a file section with the specified statistics.
* Should the Parquet Reader process a file section with the specified statistics,
* and if it should, then return the columns are candidates for further inspection of more
* granular statistics from column index and dictionary.
*
* @param numberOfRows the number of rows in the segment; this can be used with
* Statistics to determine if a column is only null
* @param statistics column statistics
* @param id Parquet file name
*
* @return Optional.empty() if statistics were sufficient to eliminate the file section.
* Otherwise, a list of columns for which page-level indices and dictionary could be consulted
* to potentially eliminate the file section. An optional with empty list is returned if there is
* going to be no benefit in looking at column index or dictionary for any column.
*/
boolean matches(long numberOfRows, Map<ColumnDescriptor, Statistics<?>> statistics, ParquetDataSourceId id)
Optional<List<ColumnDescriptor>> getIndexLookupCandidates(long numberOfRows, Map<ColumnDescriptor, Statistics<?>> statistics, ParquetDataSourceId id)
throws ParquetCorruptionException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,21 @@
import org.apache.parquet.format.Util;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.schema.MessageType;
import org.joda.time.DateTimeZone;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Verify.verify;
import static io.trino.parquet.ParquetCompressionUtils.decompress;
import static io.trino.parquet.ParquetTypeUtils.getParquetEncoding;
import static io.trino.spi.type.BigintType.BIGINT;
Expand All @@ -59,12 +60,20 @@
import static io.trino.spi.type.TinyintType.TINYINT;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.parquet.column.Encoding.BIT_PACKED;
import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
import static org.apache.parquet.column.Encoding.RLE;

public final class PredicateUtils
{
// Maximum size of dictionary that we will read for row-group pruning.
// Reading larger dictionaries is typically not beneficial. Before checking
// the dictionary, the row-group and page indexes have already been checked
// and when the dictionary does not eliminate a row-group, the work done to
// decode the dictionary and match it with predicates is wasted.
private static final int MAX_DICTIONARY_SIZE = 8096;

private PredicateUtils() {}

public static boolean isStatisticsOverflow(Type type, long min, long max)
Expand Down Expand Up @@ -120,26 +129,41 @@ public static Predicate buildPredicate(
return new TupleDomainParquetPredicate(parquetTupleDomain, columnReferences.build(), timeZone);
}

public static boolean predicateMatches(Predicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, Map<List<String>, ColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain)
throws IOException
{
return predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, Optional.empty());
}

public static boolean predicateMatches(Predicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, Map<List<String>, ColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain, Optional<ColumnIndexStore> columnIndexStore)
public static boolean predicateMatches(
Predicate parquetPredicate,
BlockMetaData block,
ParquetDataSource dataSource,
Map<List<String>, ColumnDescriptor> descriptorsByPath,
TupleDomain<ColumnDescriptor> parquetTupleDomain,
Optional<ColumnIndexStore> columnIndexStore,
DateTimeZone timeZone)
throws IOException
{
Map<ColumnDescriptor, Statistics<?>> columnStatistics = getStatistics(block, descriptorsByPath);
if (!parquetPredicate.matches(block.getRowCount(), columnStatistics, dataSource.getId())) {
Optional<List<ColumnDescriptor>> candidateColumns = parquetPredicate.getIndexLookupCandidates(block.getRowCount(), columnStatistics, dataSource.getId());
if (candidateColumns.isEmpty()) {
return false;
}
if (candidateColumns.get().isEmpty()) {
return true;
}
// Perform column index and dictionary lookups only for the subset of columns where it can be useful.
// This prevents unnecessary filesystem reads and decoding work when the predicate on a column comes from
// file-level min/max stats or more generally when the predicate selects a range equal to or wider than row-group min/max.
Predicate indexPredicate = new TupleDomainParquetPredicate(parquetTupleDomain, candidateColumns.get(), timeZone);

// Page stats is finer grained but relatively more expensive, so we do the filtering after above block filtering.
if (columnIndexStore.isPresent() && !parquetPredicate.matches(block.getRowCount(), columnIndexStore.get(), dataSource.getId())) {
if (columnIndexStore.isPresent() && !indexPredicate.matches(block.getRowCount(), columnIndexStore.get(), dataSource.getId())) {
return false;
}

return dictionaryPredicatesMatch(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain);
return dictionaryPredicatesMatch(
indexPredicate,
block,
dataSource,
descriptorsByPath,
ImmutableSet.copyOf(candidateColumns.get()),
columnIndexStore);
}

private static Map<ColumnDescriptor, Statistics<?>> getStatistics(BlockMetaData blockMetadata, Map<List<String>, ColumnDescriptor> descriptorsByPath)
Expand All @@ -157,50 +181,115 @@ private static Map<ColumnDescriptor, Statistics<?>> getStatistics(BlockMetaData
return statistics.buildOrThrow();
}

private static boolean dictionaryPredicatesMatch(Predicate parquetPredicate, BlockMetaData blockMetadata, ParquetDataSource dataSource, Map<List<String>, ColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain)
private static boolean dictionaryPredicatesMatch(
Predicate parquetPredicate,
BlockMetaData blockMetadata,
ParquetDataSource dataSource,
Map<List<String>, ColumnDescriptor> descriptorsByPath,
Set<ColumnDescriptor> candidateColumns,
Optional<ColumnIndexStore> columnIndexStore)
throws IOException
{
for (ColumnChunkMetaData columnMetaData : blockMetadata.getColumns()) {
ColumnDescriptor descriptor = descriptorsByPath.get(Arrays.asList(columnMetaData.getPath().toArray()));
if (descriptor != null) {
if (isOnlyDictionaryEncodingPages(columnMetaData) && isColumnPredicate(descriptor, parquetTupleDomain)) {
Slice buffer = dataSource.readFully(columnMetaData.getStartingPos(), toIntExact(columnMetaData.getTotalSize()));
// Early abort, predicate already filters block so no more dictionaries need be read
if (!parquetPredicate.matches(new DictionaryDescriptor(descriptor, readDictionaryPage(buffer, columnMetaData.getCodec())))) {
return false;
}
if (descriptor == null || !candidateColumns.contains(descriptor)) {
continue;
}
if (isOnlyDictionaryEncodingPages(columnMetaData)) {
Statistics<?> columnStatistics = columnMetaData.getStatistics();
boolean nullAllowed = columnStatistics == null || columnStatistics.getNumNulls() != 0;
// Early abort, predicate already filters block so no more dictionaries need be read
if (!parquetPredicate.matches(new DictionaryDescriptor(
descriptor,
nullAllowed,
readDictionaryPage(dataSource, columnMetaData, columnIndexStore)))) {
return false;
}
}
}
return true;
}

private static Optional<DictionaryPage> readDictionaryPage(Slice data, CompressionCodecName codecName)
private static Optional<DictionaryPage> readDictionaryPage(
ParquetDataSource dataSource,
ColumnChunkMetaData columnMetaData,
Optional<ColumnIndexStore> columnIndexStore)
throws IOException
{
try {
SliceInput inputStream = data.getInput();
PageHeader pageHeader = Util.readPageHeader(inputStream);
int dictionaryPageSize;
if (columnMetaData.getDictionaryPageOffset() == 0 || columnMetaData.getFirstDataPageOffset() <= columnMetaData.getDictionaryPageOffset()) {
/*
* See org.apache.parquet.hadoop.Offsets for reference.
* The offsets might not contain the proper values in the below cases:
* - The dictionaryPageOffset might not be set; in this case 0 is returned
* (0 cannot be a valid offset because of the MAGIC bytes)
* - The firstDataPageOffset might point to the dictionary page
*
* Such parquet files may have been produced by parquet-mr writers before PARQUET-1977.
* We find the dictionary page size from OffsetIndex if that exists,
* otherwise fallback to reading the full column chunk.
*/
dictionaryPageSize = columnIndexStore.flatMap(index -> getDictionaryPageSize(index, columnMetaData))
.orElseGet(() -> toIntExact(columnMetaData.getTotalSize()));
}
else {
dictionaryPageSize = toIntExact(columnMetaData.getFirstDataPageOffset() - columnMetaData.getDictionaryPageOffset());
}
// Get the dictionary page header and the dictionary in single read
Slice buffer = dataSource.readFully(columnMetaData.getStartingPos(), dictionaryPageSize);
return readPageHeaderWithData(buffer.getInput()).map(data -> decodeDictionaryPage(data, columnMetaData));
}

if (pageHeader.type != PageType.DICTIONARY_PAGE) {
return Optional.empty();
}
private static Optional<Integer> getDictionaryPageSize(ColumnIndexStore columnIndexStore, ColumnChunkMetaData columnMetaData)
{
OffsetIndex offsetIndex = columnIndexStore.getOffsetIndex(columnMetaData.getPath());
if (offsetIndex == null) {
return Optional.empty();
}
long rowGroupOffset = columnMetaData.getStartingPos();
long firstPageOffset = offsetIndex.getOffset(0);
if (rowGroupOffset < firstPageOffset) {
return Optional.of(toIntExact(firstPageOffset - rowGroupOffset));
}
return Optional.empty();
}

Slice compressedData = inputStream.readSlice(pageHeader.getCompressed_page_size());
DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
ParquetEncoding encoding = getParquetEncoding(Encoding.valueOf(dicHeader.getEncoding().name()));
int dictionarySize = dicHeader.getNum_values();
private static Optional<PageHeaderWithData> readPageHeaderWithData(SliceInput inputStream)
{
PageHeader pageHeader;
try {
pageHeader = Util.readPageHeader(inputStream);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}

return Optional.of(new DictionaryPage(decompress(codecName, compressedData, pageHeader.getUncompressed_page_size()), dictionarySize, encoding));
if (pageHeader.type != PageType.DICTIONARY_PAGE) {
return Optional.empty();
}
catch (IOException ignored) {
DictionaryPageHeader dictionaryHeader = pageHeader.getDictionary_page_header();
if (dictionaryHeader.getNum_values() > MAX_DICTIONARY_SIZE) {
return Optional.empty();
}
return Optional.of(new PageHeaderWithData(
pageHeader,
inputStream.readSlice(pageHeader.getCompressed_page_size())));
}

private static boolean isColumnPredicate(ColumnDescriptor columnDescriptor, TupleDomain<ColumnDescriptor> parquetTupleDomain)
private static DictionaryPage decodeDictionaryPage(PageHeaderWithData pageHeaderWithData, ColumnChunkMetaData chunkMetaData)
{
verify(parquetTupleDomain.getDomains().isPresent(), "parquetTupleDomain is empty");
return parquetTupleDomain.getDomains().get().containsKey(columnDescriptor);
PageHeader pageHeader = pageHeaderWithData.pageHeader();
DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
ParquetEncoding encoding = getParquetEncoding(Encoding.valueOf(dicHeader.getEncoding().name()));
int dictionarySize = dicHeader.getNum_values();

Slice compressedData = pageHeaderWithData.compressedData();
try {
return new DictionaryPage(decompress(chunkMetaData.getCodec(), compressedData, pageHeader.getUncompressed_page_size()), dictionarySize, encoding);
}
catch (IOException e) {
throw new ParquetDecodingException("Could not decode the dictionary for " + chunkMetaData.getPath(), e);
}
}

@VisibleForTesting
Expand All @@ -224,4 +313,13 @@ static boolean isOnlyDictionaryEncodingPages(ColumnChunkMetaData columnMetaData)

return false;
}

private record PageHeaderWithData(PageHeader pageHeader, Slice compressedData)
{
private PageHeaderWithData(PageHeader pageHeader, Slice compressedData)
{
this.pageHeader = requireNonNull(pageHeader, "pageHeader is null");
this.compressedData = requireNonNull(compressedData, "compressedData is null");
}
}
}
Loading