Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.BooleanType;
import com.facebook.presto.spi.type.DateType;
import com.facebook.presto.spi.type.DecimalType;
import com.facebook.presto.spi.type.DoubleType;
import com.facebook.presto.spi.type.IntegerType;
import com.facebook.presto.spi.type.RealType;
import com.facebook.presto.spi.type.TimestampType;
import com.facebook.presto.spi.type.Type;
Expand All @@ -36,6 +36,7 @@
import parquet.io.PrimitiveColumnIO;
import parquet.schema.DecimalMetadata;
import parquet.schema.MessageType;
import parquet.schema.OriginalType;

import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -45,6 +46,7 @@
import java.util.Optional;

import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.type.IntegerType.INTEGER;
import static com.google.common.base.Preconditions.checkArgument;
import static parquet.schema.OriginalType.DECIMAL;
import static parquet.schema.Type.Repetition.REPEATED;
Expand Down Expand Up @@ -169,7 +171,7 @@ public static Type getPrestoType(TupleDomain<ColumnDescriptor> effectivePredicat
case DOUBLE:
return DoubleType.DOUBLE;
case INT32:
return createDecimalType(descriptor).orElse(IntegerType.INTEGER);
return getInt32Type(descriptor);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why INT64 and INT32 cases are handled differently now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because Parquet has Logical Types where additional metadata in originalType determines how an int32 can be interpreted. In this case, it's checking if the int32 was configured to be interpreted as a date (number of days since Unix epoch which matches the semantics of the Presto Date type).

Before this change if I had a query like:

select count(*) from my_table where date_col = DATE '2018-01-01'

The query would fail with a type mismatch exception about comparing a date value to integer because the logical type wasn't considered. There's a similar issue related to this here - #11118 - which is more involved as I was only targeting the date pushdown currently.

I backed out the timestamp changes but the case statement for int64 in the future would be adjusted in a similar way to consider logical types e.g. an int64 can be logically typed as a timestamp where the value is in epoch millis, micro or nanos.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this, #11118 actually solves this differently by adding the HiveType to RichColumnDescriptor then determining the mapped Presto type for an int32 based on that here

The outcome should be the same though.

Copy link
Copy Markdown
Contributor Author

@ryanrupp ryanrupp Nov 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some unit tests in 1f78d8e3c0dfa4ac8e9ad7a8e2b9f53409a55e05 for this in case this changes in the future.

case INT64:
return createDecimalType(descriptor).orElse(BigintType.BIGINT);
case INT96:
Expand Down Expand Up @@ -278,8 +280,12 @@ public static Optional<Type> createDecimalType(RichColumnDescriptor descriptor)
if (descriptor.getPrimitiveType().getOriginalType() != DECIMAL) {
return Optional.empty();
}
DecimalMetadata decimalMetadata = descriptor.getPrimitiveType().getDecimalMetadata();
return Optional.of(DecimalType.createDecimalType(decimalMetadata.getPrecision(), decimalMetadata.getScale()));
return Optional.of(createDecimalType(descriptor.getPrimitiveType().getDecimalMetadata()));
}

private static Type createDecimalType(DecimalMetadata decimalMetadata)
{
return DecimalType.createDecimalType(decimalMetadata.getPrecision(), decimalMetadata.getScale());
}

/**
Expand Down Expand Up @@ -309,4 +315,21 @@ public static long getShortDecimalValue(byte[] bytes)

return value;
}

private static Type getInt32Type(RichColumnDescriptor descriptor)
{
OriginalType originalType = descriptor.getPrimitiveType().getOriginalType();
if (originalType == null) {
return INTEGER;
}

switch (originalType) {
case DECIMAL:
return createDecimalType(descriptor.getPrimitiveType().getDecimalMetadata());
case DATE:
return DateType.DATE;
default:
return INTEGER;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static com.facebook.presto.parquet.predicate.PredicateUtils.isStatisticsOverflow;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
import static com.facebook.presto.spi.type.DateType.DATE;
import static com.facebook.presto.spi.type.DoubleType.DOUBLE;
import static com.facebook.presto.spi.type.IntegerType.INTEGER;
import static com.facebook.presto.spi.type.RealType.REAL;
Expand Down Expand Up @@ -200,6 +201,15 @@ else if (isVarcharType(type) && statistics instanceof BinaryStatistics) {
ParquetStringStatistics parquetStringStatistics = new ParquetStringStatistics(minSlice, maxSlice);
return createDomain(type, hasNullValue, parquetStringStatistics);
}
else if (type.equals(DATE) && statistics instanceof IntStatistics) {
IntStatistics intStatistics = (IntStatistics) statistics;
// ignore corrupted statistics
if (intStatistics.genericGetMin() > intStatistics.genericGetMax()) {
return Domain.create(ValueSet.all(type), hasNullValue);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nezihyigitbasi do you know the reason why we silently ignore something? I see we do this already for other cases above.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't exactly remember the history behind that. @zhenxiao do you remember why we are silently ignoring corrupt stats?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as far as I remembered, if the statistics is obviously incorrect/corrupted(in this case, min > max), we silently do not leverage statistics to evaluate predicates.
it should not error out, as if stats are incorrect/corrupted, we could still scan data to complete the query, just lost a potential speedup.
maybe adding a log message here? @findepi @nezihyigitbasi what do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I prefer failing fast & loudly for such cases as that may uncover bugs/issues in whoever has written those files/stats.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it. fail loudly is better. I will file following task to fix it

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here it is: #12036

}
ParquetIntegerStatistics parquetIntegerStatistics = new ParquetIntegerStatistics((long) intStatistics.getMin(), (long) intStatistics.getMax());
return createDomain(type, hasNullValue, parquetIntegerStatistics);
}
return Domain.create(ValueSet.all(type), hasNullValue);
}

Expand Down Expand Up @@ -235,7 +245,7 @@ public static Domain getDomain(Type type, DictionaryDescriptor dictionaryDescrip
domains.add(Domain.onlyNull(type));
return Domain.union(domains);
}
else if (type.equals(BIGINT) && columnDescriptor.getType() == PrimitiveTypeName.INT32) {
else if ((type.equals(BIGINT) || type.equals(DATE)) && columnDescriptor.getType() == PrimitiveTypeName.INT32) {
List<Domain> domains = new ArrayList<>();
for (int i = 0; i < dictionarySize; i++) {
domains.add(Domain.singleValue(type, (long) dictionary.decodeToInt(i)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.parquet;

import com.facebook.presto.spi.predicate.TupleDomain;
import org.testng.annotations.Test;
import parquet.column.ColumnDescriptor;
import parquet.schema.OriginalType;
import parquet.schema.PrimitiveType;
import parquet.schema.PrimitiveType.PrimitiveTypeName;

import static com.facebook.presto.parquet.ParquetTypeUtils.getPrestoType;
import static com.facebook.presto.spi.type.DateType.DATE;
import static com.facebook.presto.spi.type.IntegerType.INTEGER;
import static org.testng.Assert.assertEquals;
import static parquet.schema.Type.Repetition.OPTIONAL;

public class TestParquetTypeUtils
{
@Test
public void testMapInt32ToPrestoInteger()
{
PrimitiveType intType = new PrimitiveType(OPTIONAL, PrimitiveTypeName.INT32, "int_col", OriginalType.INT_32);
ColumnDescriptor columnDescriptor = new ColumnDescriptor(new String[]{"int_col"}, PrimitiveTypeName.INT32, 0, 1);
RichColumnDescriptor intColumn = new RichColumnDescriptor(columnDescriptor, intType);
assertEquals(getPrestoType(TupleDomain.all(), intColumn), INTEGER);
}

@Test
public void testMapInt32WithoutOriginalTypeToPrestoInteger()
{
// int32 primitive should default to Presto integer if original type metadata isn't available
PrimitiveType intType = new PrimitiveType(OPTIONAL, PrimitiveTypeName.INT32, "int_col");
ColumnDescriptor columnDescriptor = new ColumnDescriptor(new String[]{"int_col"}, PrimitiveTypeName.INT32, 0, 1);
RichColumnDescriptor intColumn = new RichColumnDescriptor(columnDescriptor, intType);
assertEquals(getPrestoType(TupleDomain.all(), intColumn), INTEGER);
}

@Test
public void testMapInt32ToPrestoDate()
{
// int32 primitive with original type of date should map to a Presto date
PrimitiveType dateType = new PrimitiveType(OPTIONAL, PrimitiveTypeName.INT32, "date_col", OriginalType.DATE);
ColumnDescriptor columnDescriptor = new ColumnDescriptor(new String[]{"date_col"}, PrimitiveTypeName.INT32, 0, 1);
RichColumnDescriptor dateColumn = new RichColumnDescriptor(columnDescriptor, dateType);
assertEquals(getPrestoType(TupleDomain.all(), dateColumn), DATE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import parquet.column.statistics.BooleanStatistics;
import parquet.column.statistics.DoubleStatistics;
import parquet.column.statistics.FloatStatistics;
import parquet.column.statistics.IntStatistics;
import parquet.column.statistics.LongStatistics;
import parquet.column.statistics.Statistics;
import parquet.io.api.Binary;
Expand All @@ -45,6 +46,7 @@
import static com.facebook.presto.spi.predicate.TupleDomain.withColumnDomains;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
import static com.facebook.presto.spi.type.DateType.DATE;
import static com.facebook.presto.spi.type.DoubleType.DOUBLE;
import static com.facebook.presto.spi.type.IntegerType.INTEGER;
import static com.facebook.presto.spi.type.RealType.REAL;
Expand Down Expand Up @@ -189,6 +191,16 @@ public void testFloat()
assertEquals(getDomain(REAL, 10, floatColumnStats(maximum, minimum)), create(ValueSet.all(REAL), false));
}

@Test
public void testDate()
{
assertEquals(getDomain(DATE, 0, null), all(DATE));
assertEquals(getDomain(DATE, 10, intColumnStats(100, 100)), singleValue(DATE, 100L));
assertEquals(getDomain(DATE, 10, intColumnStats(0, 100)), create(ValueSet.ofRanges(range(DATE, 0L, true, 100L, true)), false));
// assert corrupt stats are ignored properly
assertEquals(getDomain(DATE, 10, intColumnStats(200, 100)), create(ValueSet.all(DATE), false));
}

@Test
public void testMatchesWithStatistics()
{
Expand Down Expand Up @@ -228,4 +240,11 @@ private static FloatStatistics floatColumnStats(float minimum, float maximum)
statistics.setMinMax(minimum, maximum);
return statistics;
}

private static IntStatistics intColumnStats(int minimum, int maximum)
{
IntStatistics statistics = new IntStatistics();
statistics.setMinMax(minimum, maximum);
return statistics;
}
}