Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public boolean matches(long numberOfRows, Map<ColumnDescriptor, Statistics<?>> s
Statistics<?> columnStatistics = statistics.get(column);

Domain domain;
Type type = getPrestoType(column);
Type type = getType(column);
if (columnStatistics == null || columnStatistics.isEmpty()) {
// no stats for column
domain = Domain.all(type);
Expand All @@ -101,7 +101,7 @@ public boolean matches(Map<ColumnDescriptor, ParquetDictionaryDescriptor> dictio

for (RichColumnDescriptor column : columns) {
ParquetDictionaryDescriptor dictionaryDescriptor = dictionaries.get(column);
Domain domain = getDomain(getPrestoType(column), dictionaryDescriptor);
Domain domain = getDomain(getType(column), dictionaryDescriptor);
if (domain != null) {
domains.put(column, domain);
}
Expand All @@ -111,6 +111,21 @@ public boolean matches(Map<ColumnDescriptor, ParquetDictionaryDescriptor> dictio
return effectivePredicate.overlaps(stripeDomain);
}

private Type getType(RichColumnDescriptor column)
{
// we look at effective predicate domain because it more accurately matches the hive column
// than the type available in the parquet metadata passed here as RichColumnDescriptor
// for example varchar(len) hive column is translated to binary and then to varchar type using parquet metadata
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example varchar(len) hive column is translated to binary.

Can you elaborate please?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a stab at breaking it down to as much detail as I can.

  1. During predicate match we need the domain type of both the column in the predicate (if specified) and column from schema to match. TupleDomainParquetPredicate#matches
  2. Now for varchar(fixedLen)effectivePredicate's domain aligns with the type specified in the hive schema.
  3. However the hive schema columns uses RichColumnDescriptor which extends ColumnDescriptor from hive. This class cannot capture the varchar(fixLen) the same way and it doesn't seem straightforward to change that.

Looking at my comment above I can see that I made the mistake of saying from parquet metadata. Let me know if the explanation above make sense and if you have any suggestion to reword that comment.
Again thanks for offering feedback on this. I think this might be affecting a lot more users than just us.

Optional<Map<ColumnDescriptor, Domain>> predicateDomains = effectivePredicate.getDomains();
if (predicateDomains.isPresent()) {
Domain domain = predicateDomains.get().get(column);
if (domain != null) {
return domain.getType();
}
}
return getPrestoType(column);
}

@VisibleForTesting
public static Domain getDomain(Type type, long rowCount, Statistics<?> statistics)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,30 @@
*/
package com.facebook.presto.hive.parquet;

import com.facebook.presto.hive.parquet.predicate.ParquetDictionaryDescriptor;
import com.facebook.presto.hive.parquet.predicate.TupleDomainParquetPredicate;
import com.facebook.presto.spi.predicate.Domain;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.predicate.ValueSet;
import com.facebook.presto.spi.type.VarcharType;
import org.testng.annotations.Test;
import parquet.column.ColumnDescriptor;
import parquet.column.statistics.BinaryStatistics;
import parquet.column.statistics.BooleanStatistics;
import parquet.column.statistics.DoubleStatistics;
import parquet.column.statistics.FloatStatistics;
import parquet.column.statistics.LongStatistics;
import parquet.column.statistics.Statistics;
import parquet.io.api.Binary;
import parquet.schema.PrimitiveType;
import parquet.schema.PrimitiveType.PrimitiveTypeName;
import parquet.schema.Type.Repetition;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.facebook.presto.hive.parquet.ParquetEncoding.PLAIN_DICTIONARY;
import static com.facebook.presto.hive.parquet.predicate.TupleDomainParquetPredicate.getDomain;
import static com.facebook.presto.spi.predicate.Domain.all;
import static com.facebook.presto.spi.predicate.Domain.create;
Expand All @@ -36,9 +51,14 @@
import static com.facebook.presto.spi.type.SmallintType.SMALLINT;
import static com.facebook.presto.spi.type.TinyintType.TINYINT;
import static com.facebook.presto.spi.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.spi.type.VarcharType.createVarcharType;
import static io.airlift.slice.Slices.utf8Slice;
import static java.lang.Float.floatToRawIntBits;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static parquet.column.statistics.Statistics.getStatsBasedOnType;

public class TestTupleDomainParquetPredicate
{
Expand Down Expand Up @@ -166,6 +186,46 @@ public void testFloat()
assertEquals(getDomain(REAL, 10, floatColumnStats(maximum, minimum)), create(ValueSet.all(REAL), false));
}

@Test
public void testMatchesWithStatistics()
{
RichColumnDescriptor column = getColumn();
String value = "Test";
TupleDomain<ColumnDescriptor> effectivePredicate = getEffectivePredicate(column, createVarcharType(255), value);
List<RichColumnDescriptor> columns = singletonList(column);
TupleDomainParquetPredicate predicate = new TupleDomainParquetPredicate(effectivePredicate, columns);
Statistics stats = getStatsBasedOnType(column.getType());
stats.setNumNulls(1L);
stats.setMinMaxFromBytes(value.getBytes(), value.getBytes());
assertTrue(predicate.matches(2, singletonMap(column, stats)));
}

@Test
public void testMatchesWithDescriptors()
{
RichColumnDescriptor column = getColumn();
String value = "Test2";
TupleDomain<ColumnDescriptor> effectivePredicate = getEffectivePredicate(column, createVarcharType(255), value);
List<RichColumnDescriptor> columns = singletonList(column);
TupleDomainParquetPredicate predicate = new TupleDomainParquetPredicate(effectivePredicate, columns);
ParquetDictionaryPage page = new ParquetDictionaryPage(utf8Slice(value), 2, PLAIN_DICTIONARY);
assertTrue(predicate.matches(singletonMap(column, new ParquetDictionaryDescriptor(column, Optional.of(page)))));
}

private TupleDomain<ColumnDescriptor> getEffectivePredicate(RichColumnDescriptor column, VarcharType type, String value)
{
ColumnDescriptor predicateColumn = new ColumnDescriptor(column.getPath(), column.getType(), 0, 0);
Domain predicateDomain = Domain.singleValue(type, utf8Slice(value));
Map<ColumnDescriptor, Domain> predicateColumns = singletonMap(predicateColumn, predicateDomain);
return TupleDomain.withColumnDomains(predicateColumns);
}

private RichColumnDescriptor getColumn()
{
PrimitiveType type = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "Test column");
return new RichColumnDescriptor(new String[] {"path"}, type, 0, 0);
}

private static FloatStatistics floatColumnStats(float minimum, float maximum)
{
FloatStatistics statistics = new FloatStatistics();
Expand Down