Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,8 @@
*/
package io.prestosql.parquet;

import io.prestosql.spi.PrestoException;
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.type.BigintType;
import io.prestosql.spi.type.BooleanType;
import io.prestosql.spi.type.DateType;
import io.prestosql.spi.type.DecimalType;
import io.prestosql.spi.type.DoubleType;
import io.prestosql.spi.type.RealType;
import io.prestosql.spi.type.TimestampType;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.VarcharType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.io.ColumnIO;
import org.apache.parquet.io.ColumnIOFactory;
Expand All @@ -36,7 +25,6 @@
import org.apache.parquet.io.PrimitiveColumnIO;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;

import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -46,8 +34,6 @@
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static org.apache.parquet.schema.OriginalType.DECIMAL;
import static org.apache.parquet.schema.Type.Repetition.REPEATED;

Expand Down Expand Up @@ -159,46 +145,6 @@ private static int getPathIndex(List<PrimitiveColumnIO> columns, List<String> pa
return index;
}

public static Type getPrestoType(TupleDomain<ColumnDescriptor> effectivePredicate, RichColumnDescriptor descriptor)
{
switch (descriptor.getType()) {
case BOOLEAN:
return BooleanType.BOOLEAN;
case BINARY:
return createDecimalType(descriptor).orElse(createVarcharType(effectivePredicate, descriptor));
case FLOAT:
return RealType.REAL;
case DOUBLE:
return DoubleType.DOUBLE;
case INT32:
return getInt32Type(descriptor);
case INT64:
return createDecimalType(descriptor).orElse(BigintType.BIGINT);
case INT96:
return TimestampType.TIMESTAMP;
case FIXED_LEN_BYTE_ARRAY:
return createDecimalType(descriptor).orElseThrow(() -> new PrestoException(NOT_SUPPORTED, "Parquet type FIXED_LEN_BYTE_ARRAY supported as DECIMAL; got " + descriptor.getPrimitiveType().getOriginalType()));
default:
throw new PrestoException(NOT_SUPPORTED, "Unsupported parquet type: " + descriptor.getType());
}
}

private static Type createVarcharType(TupleDomain<ColumnDescriptor> effectivePredicate, RichColumnDescriptor column)
{
// We look at the effectivePredicate domain here, because it matches the Hive column type
// more accurately than the type available in the RichColumnDescriptor.
// For example, a Hive column of type varchar(length) is encoded as a Parquet BINARY, but
// when that is converted to a Presto Type the length information wasn't retained.
Optional<Map<ColumnDescriptor, Domain>> predicateDomains = effectivePredicate.getDomains();
if (predicateDomains.isPresent()) {
Domain domain = predicateDomains.get().get(column);
if (domain != null) {
return domain.getType();
}
}
return VarcharType.VARCHAR;
}

public static int getFieldIndex(MessageType fileSchema, String name)
{
try {
Expand Down Expand Up @@ -316,21 +262,4 @@ public static long getShortDecimalValue(byte[] bytes)

return value;
}

private static Type getInt32Type(RichColumnDescriptor descriptor)
{
OriginalType originalType = descriptor.getPrimitiveType().getOriginalType();
if (originalType == null) {
return INTEGER;
}

switch (originalType) {
case DECIMAL:
return createDecimalType(descriptor.getPrimitiveType().getDecimalMetadata());
case DATE:
return DateType.DATE;
default:
return INTEGER;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
package io.prestosql.parquet.predicate;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.prestosql.parquet.DictionaryPage;
Expand Down Expand Up @@ -44,7 +44,6 @@
import java.util.Optional;
import java.util.function.Function;

import static io.prestosql.parquet.ParquetTypeUtils.getPrestoType;
import static io.prestosql.parquet.predicate.PredicateUtils.isStatisticsOverflow;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.BooleanType.BOOLEAN;
Expand Down Expand Up @@ -78,42 +77,53 @@ public boolean matches(long numberOfRows, Map<ColumnDescriptor, Statistics<?>> s
if (numberOfRows == 0) {
return false;
}
ImmutableMap.Builder<ColumnDescriptor, Domain> domains = ImmutableMap.builder();
if (effectivePredicate.isNone()) {
return false;
}
Map<ColumnDescriptor, Domain> effectivePredicateDomains = effectivePredicate.getDomains()
.orElseThrow(() -> new IllegalStateException("Effective predicate other than none should have domains"));

for (RichColumnDescriptor column : columns) {
Statistics<?> columnStatistics = statistics.get(column);
Domain effectivePredicateDomain = effectivePredicateDomains.get(column);
if (effectivePredicateDomain == null) {
continue;
}

Domain domain;
Type type = getPrestoType(effectivePredicate, column);
Statistics<?> columnStatistics = statistics.get(column);
if (columnStatistics == null || columnStatistics.isEmpty()) {
// no stats for column
domain = Domain.all(type);
}
else {
domain = getDomain(type, numberOfRows, columnStatistics, id, column.toString(), failOnCorruptedParquetStatistics);
Domain domain = getDomain(effectivePredicateDomain.getType(), numberOfRows, columnStatistics, id, column.toString(), failOnCorruptedParquetStatistics);
if (effectivePredicateDomain.intersect(domain).isNone()) {
return false;
}
}
domains.put(column, domain);
}
TupleDomain<ColumnDescriptor> stripeDomain = TupleDomain.withColumnDomains(domains.build());

return effectivePredicate.overlaps(stripeDomain);
return true;
}

@Override
public boolean matches(Map<ColumnDescriptor, DictionaryDescriptor> dictionaries)
{
ImmutableMap.Builder<ColumnDescriptor, Domain> domains = ImmutableMap.builder();
if (effectivePredicate.isNone()) {
return false;
}
Map<ColumnDescriptor, Domain> effectivePredicateDomains = effectivePredicate.getDomains()
.orElseThrow(() -> new IllegalStateException("Effective predicate other than none should have domains"));

for (RichColumnDescriptor column : columns) {
Domain effectivePredicateDomain = effectivePredicateDomains.get(column);
if (effectivePredicateDomain == null) {
continue;
}
DictionaryDescriptor dictionaryDescriptor = dictionaries.get(column);
Domain domain = getDomain(getPrestoType(effectivePredicate, column), dictionaryDescriptor);
if (domain != null) {
domains.put(column, domain);
Domain domain = getDomain(effectivePredicateDomain.getType(), dictionaryDescriptor);
if (effectivePredicateDomain.intersect(domain).isNone()) {
return false;
}
}
TupleDomain<ColumnDescriptor> stripeDomain = TupleDomain.withColumnDomains(domains.build());

return effectivePredicate.overlaps(stripeDomain);
return true;
}

@VisibleForTesting
Expand Down Expand Up @@ -148,8 +158,11 @@ public static Domain getDomain(Type type, long rowCount, Statistics<?> statistic
if (hasFalseValues) {
return Domain.create(ValueSet.of(type, false), hasNullValue);
}
// All nulls case is handled earlier
throw new VerifyException("Impossible boolean statistics");
}
else if ((type.equals(BIGINT) || type.equals(TINYINT) || type.equals(SMALLINT) || type.equals(INTEGER)) && (statistics instanceof LongStatistics || statistics instanceof IntStatistics)) {

if ((type.equals(BIGINT) || type.equals(TINYINT) || type.equals(SMALLINT) || type.equals(INTEGER)) && (statistics instanceof LongStatistics || statistics instanceof IntStatistics)) {
ParquetIntegerStatistics parquetIntegerStatistics;
if (statistics instanceof LongStatistics) {
LongStatistics longStatistics = (LongStatistics) statistics;
Expand All @@ -172,7 +185,8 @@ else if ((type.equals(BIGINT) || type.equals(TINYINT) || type.equals(SMALLINT) |
}
return createDomain(type, hasNullValue, parquetIntegerStatistics);
}
else if (type.equals(REAL) && statistics instanceof FloatStatistics) {

if (type.equals(REAL) && statistics instanceof FloatStatistics) {
FloatStatistics floatStatistics = (FloatStatistics) statistics;
if (floatStatistics.genericGetMin() > floatStatistics.genericGetMax()) {
failWithCorruptionException(failOnCorruptedParquetStatistics, column, id, floatStatistics);
Expand All @@ -185,7 +199,8 @@ else if (type.equals(REAL) && statistics instanceof FloatStatistics) {

return createDomain(type, hasNullValue, parquetStatistics);
}
else if (type.equals(DOUBLE) && statistics instanceof DoubleStatistics) {

if (type.equals(DOUBLE) && statistics instanceof DoubleStatistics) {
DoubleStatistics doubleStatistics = (DoubleStatistics) statistics;
if (doubleStatistics.genericGetMin() > doubleStatistics.genericGetMax()) {
failWithCorruptionException(failOnCorruptedParquetStatistics, column, id, doubleStatistics);
Expand All @@ -194,7 +209,8 @@ else if (type.equals(DOUBLE) && statistics instanceof DoubleStatistics) {
ParquetDoubleStatistics parquetDoubleStatistics = new ParquetDoubleStatistics(doubleStatistics.genericGetMin(), doubleStatistics.genericGetMax());
return createDomain(type, hasNullValue, parquetDoubleStatistics);
}
else if (isVarcharType(type) && statistics instanceof BinaryStatistics) {

if (isVarcharType(type) && statistics instanceof BinaryStatistics) {
BinaryStatistics binaryStatistics = (BinaryStatistics) statistics;
Slice minSlice = Slices.wrappedBuffer(binaryStatistics.getMin().getBytes());
Slice maxSlice = Slices.wrappedBuffer(binaryStatistics.getMax().getBytes());
Expand All @@ -205,7 +221,8 @@ else if (isVarcharType(type) && statistics instanceof BinaryStatistics) {
ParquetStringStatistics parquetStringStatistics = new ParquetStringStatistics(minSlice, maxSlice);
return createDomain(type, hasNullValue, parquetStringStatistics);
}
else if (type.equals(DATE) && statistics instanceof IntStatistics) {

if (type.equals(DATE) && statistics instanceof IntStatistics) {
IntStatistics intStatistics = (IntStatistics) statistics;
if (intStatistics.genericGetMin() > intStatistics.genericGetMax()) {
failWithCorruptionException(failOnCorruptedParquetStatistics, column, id, intStatistics);
Expand All @@ -214,20 +231,21 @@ else if (type.equals(DATE) && statistics instanceof IntStatistics) {
ParquetIntegerStatistics parquetIntegerStatistics = new ParquetIntegerStatistics((long) intStatistics.getMin(), (long) intStatistics.getMax());
return createDomain(type, hasNullValue, parquetIntegerStatistics);
}

return Domain.create(ValueSet.all(type), hasNullValue);
}

@VisibleForTesting
public static Domain getDomain(Type type, DictionaryDescriptor dictionaryDescriptor)
{
if (dictionaryDescriptor == null) {
return null;
return Domain.all(type);
}

ColumnDescriptor columnDescriptor = dictionaryDescriptor.getColumnDescriptor();
Optional<DictionaryPage> dictionaryPage = dictionaryDescriptor.getDictionaryPage();
if (!dictionaryPage.isPresent()) {
return null;
return Domain.all(type);
}

Dictionary dictionary;
Expand All @@ -237,7 +255,8 @@ public static Domain getDomain(Type type, DictionaryDescriptor dictionaryDescrip
catch (Exception e) {
// In case of exception, just continue reading the data, not using dictionary page at all
// OK to ignore exception when reading dictionaries
return null;
// TODO take failOnCorruptedParquetStatistics parameter and handle appropriately
return Domain.all(type);
}

int dictionarySize = dictionaryPage.get().getDictionarySize();
Expand All @@ -249,39 +268,44 @@ public static Domain getDomain(Type type, DictionaryDescriptor dictionaryDescrip
domains.add(Domain.onlyNull(type));
return Domain.union(domains);
}
else if ((type.equals(BIGINT) || type.equals(DATE)) && columnDescriptor.getType() == PrimitiveTypeName.INT32) {

if ((type.equals(BIGINT) || type.equals(DATE)) && columnDescriptor.getType() == PrimitiveTypeName.INT32) {
List<Domain> domains = new ArrayList<>();
for (int i = 0; i < dictionarySize; i++) {
domains.add(Domain.singleValue(type, (long) dictionary.decodeToInt(i)));
}
domains.add(Domain.onlyNull(type));
return Domain.union(domains);
}
else if (type.equals(DOUBLE) && columnDescriptor.getType() == PrimitiveTypeName.DOUBLE) {

if (type.equals(DOUBLE) && columnDescriptor.getType() == PrimitiveTypeName.DOUBLE) {
List<Domain> domains = new ArrayList<>();
for (int i = 0; i < dictionarySize; i++) {
domains.add(Domain.singleValue(type, dictionary.decodeToDouble(i)));
}
domains.add(Domain.onlyNull(type));
return Domain.union(domains);
}
else if (type.equals(DOUBLE) && columnDescriptor.getType() == PrimitiveTypeName.FLOAT) {

if (type.equals(DOUBLE) && columnDescriptor.getType() == PrimitiveTypeName.FLOAT) {
List<Domain> domains = new ArrayList<>();
for (int i = 0; i < dictionarySize; i++) {
domains.add(Domain.singleValue(type, (double) dictionary.decodeToFloat(i)));
}
domains.add(Domain.onlyNull(type));
return Domain.union(domains);
}
else if (isVarcharType(type) && columnDescriptor.getType() == PrimitiveTypeName.BINARY) {

if (isVarcharType(type) && columnDescriptor.getType() == PrimitiveTypeName.BINARY) {
List<Domain> domains = new ArrayList<>();
for (int i = 0; i < dictionarySize; i++) {
domains.add(Domain.singleValue(type, Slices.wrappedBuffer(dictionary.decodeToBinary(i).getBytes())));
}
domains.add(Domain.onlyNull(type));
return Domain.union(domains);
}
return null;

return Domain.all(type);
}

private static void failWithCorruptionException(boolean failOnCorruptedParquetStatistics, String column, ParquetDataSourceId id, Statistics statistics)
Expand Down

This file was deleted.

Loading