Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@
import java.util.stream.Collectors;

import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.hive.MetadataUtils.createPredicate;
import static com.facebook.presto.hive.MetadataUtils.getCombinedRemainingPredicate;
import static com.facebook.presto.hive.MetadataUtils.getDiscretePredicates;
import static com.facebook.presto.hive.MetadataUtils.getPredicate;
Expand Down Expand Up @@ -216,13 +215,14 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(
IcebergTableHandle handle = (IcebergTableHandle) table;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());

TupleDomain<ColumnHandle> partitionColumnPredicate = TupleDomain.withColumnDomains(Maps.filterKeys(constraint.getSummary().getDomains().get(), Predicates.in(getPartitionKeyColumnHandles(icebergTable, typeManager))));
List<IcebergColumnHandle> partitionColumns = getPartitionKeyColumnHandles(icebergTable, typeManager);
TupleDomain<ColumnHandle> partitionColumnPredicate = TupleDomain.withColumnDomains(Maps.filterKeys(constraint.getSummary().getDomains().get(), Predicates.in(partitionColumns)));
Optional<Set<IcebergColumnHandle>> requestedColumns = desiredColumns.map(columns -> columns.stream().map(column -> (IcebergColumnHandle) column).collect(toImmutableSet()));

ConnectorTableLayout layout = getTableLayout(
session,
new IcebergTableLayoutHandle.Builder()
.setPartitionColumns(ImmutableList.copyOf(getPartitionKeyColumnHandles(icebergTable, typeManager)))
.setPartitionColumns(ImmutableList.copyOf(partitionColumns))
.setDataColumns(toHiveColumns(icebergTable.schema().columns()))
.setDomainPredicate(constraint.getSummary().transform(IcebergAbstractMetadata::toSubfield))
.setRemainingPredicate(TRUE_CONSTANT)
Expand Down Expand Up @@ -258,59 +258,52 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa

Table icebergTable = getIcebergTable(session, tableHandle.getSchemaTableName());
validateTableMode(session, icebergTable);

List<ColumnHandle> partitionColumns = ImmutableList.copyOf(icebergTableLayoutHandle.getPartitionColumns());
if (!isPushdownFilterEnabled(session)) {
return new ConnectorTableLayout(handle);
}

if (!icebergTableLayoutHandle.getPartitions().isPresent()) {
return new ConnectorTableLayout(
icebergTableLayoutHandle,
Optional.empty(),
TupleDomain.none(),
icebergTableLayoutHandle.getPartitionColumnPredicate(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
ImmutableList.of(),
Optional.empty());
}
List<ColumnHandle> partitionColumns = ImmutableList.copyOf(icebergTableLayoutHandle.getPartitionColumns());
List<HivePartition> partitions = icebergTableLayoutHandle.getPartitions().get();

Optional<DiscretePredicates> discretePredicates = getDiscretePredicates(partitionColumns, partitions);

TupleDomain<ColumnHandle> predicate;
RowExpression subfieldPredicate;
if (isPushdownFilterEnabled(session)) {
Map<String, ColumnHandle> predicateColumns = icebergTableLayoutHandle.getPredicateColumns().entrySet()
.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Optional<List<HivePartition>> partitions = icebergTableLayoutHandle.getPartitions();
Optional<DiscretePredicates> discretePredicates = partitions.flatMap(parts -> getDiscretePredicates(partitionColumns, parts));

predicate = getPredicate(icebergTableLayoutHandle, partitionColumns, partitions, predicateColumns);
Map<String, ColumnHandle> predicateColumns = icebergTableLayoutHandle.getPredicateColumns().entrySet()
.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Optional<TupleDomain<ColumnHandle>> predicate = partitions.map(parts -> getPredicate(icebergTableLayoutHandle, partitionColumns, parts, predicateColumns));
// capture subfields from domainPredicate to add to remainingPredicate
// so those filters don't get lost
Map<String, com.facebook.presto.common.type.Type> columnTypes = getColumns(icebergTable.schema(), icebergTable.spec(), typeManager).stream()
.collect(toImmutableMap(IcebergColumnHandle::getName, icebergColumnHandle -> getColumnMetadata(session, tableHandle, icebergColumnHandle).getType()));

// capture subfields from domainPredicate to add to remainingPredicate
// so those filters don't get lost
Map<String, com.facebook.presto.common.type.Type> columnTypes = getColumns(icebergTable.schema(), icebergTable.spec(), typeManager).stream()
.collect(toImmutableMap(IcebergColumnHandle::getName, icebergColumnHandle -> getColumnMetadata(session, tableHandle, icebergColumnHandle).getType()));

subfieldPredicate = getSubfieldPredicate(session, icebergTableLayoutHandle, columnTypes, functionResolution, rowExpressionService);
}
else {
predicate = createPredicate(partitionColumns, partitions);
subfieldPredicate = TRUE_CONSTANT;
}
RowExpression subfieldPredicate = getSubfieldPredicate(session, icebergTableLayoutHandle, columnTypes, functionResolution, rowExpressionService);

// combine subfieldPredicate with remainingPredicate
RowExpression combinedRemainingPredicate = getCombinedRemainingPredicate(icebergTableLayoutHandle, subfieldPredicate);

return new ConnectorTableLayout(
icebergTableLayoutHandle,
Optional.empty(),
predicate,
Optional.empty(),
Optional.empty(),
discretePredicates,
ImmutableList.of(),
Optional.of(combinedRemainingPredicate));
return predicate.map(pred -> new ConnectorTableLayout(
icebergTableLayoutHandle,
Optional.empty(),
pred,
Optional.empty(),
Optional.empty(),
discretePredicates,
ImmutableList.of(),
Optional.of(combinedRemainingPredicate)))
.orElseGet(() -> new ConnectorTableLayout(
icebergTableLayoutHandle,
Optional.empty(),
TupleDomain.none(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
ImmutableList.of(),
Optional.empty()));
}

protected Optional<SystemTable> getIcebergSystemTable(SchemaTableName tableName, Table table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,21 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons
.setRowCount(Estimate.of(0))
.build();
}
// the total record count for the whole table
Optional<Long> totalRecordCount = Optional.of(intersection)
.filter(domain -> !domain.isAll())
.map(domain -> getDataTableSummary(tableHandle, ImmutableList.of(), TupleDomain.all(), idToTypeMapping, nonPartitionPrimitiveColumns, partitionFields).getRecordCount());

double recordCount = summary.getRecordCount();
TableStatistics.Builder result = TableStatistics.builder();
result.setRowCount(Estimate.of(recordCount));

Map<Integer, ColumnStatistics.Builder> tableStats = getClosestStatisticsFileForSnapshot(tableHandle)
.map(this::loadStatisticsFile).orElseGet(Collections::emptyMap);
// scale all NDV values loaded from puffin files based on row count
totalRecordCount.ifPresent(fullTableRecordCount -> tableStats.forEach((id, stat) ->
stat.setDistinctValuesCount(stat.getDistinctValuesCount().map(value -> value * recordCount / fullTableRecordCount))));

for (IcebergColumnHandle columnHandle : selectedColumns) {
int fieldId = columnHandle.getId();
ColumnStatistics.Builder columnBuilder = tableStats.getOrDefault(fieldId, ColumnStatistics.builder());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestDistributedQueries;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -117,7 +118,7 @@
import static org.testng.Assert.assertTrue;

@Test(singleThreaded = true)
public class IcebergDistributedTestBase
public abstract class IcebergDistributedTestBase
extends AbstractTestDistributedQueries
{
private final CatalogType catalogType;
Expand Down Expand Up @@ -1245,6 +1246,63 @@ public void testEqualityDeletesWithHiddenPartitionsEvolution(String fileFormat,
assertQuery(session, "SELECT * FROM " + tableName, "VALUES (1, '1001', NULL, NULL), (3, '1003', NULL, NULL), (6, '1004', 1, NULL), (6, '1006', 2, 'th002')");
}

@Test
public void testPartShowStatsWithFilters()
{
assertQuerySucceeds("CREATE TABLE showstatsfilters (i int) WITH (partitioning = ARRAY['i'])");
assertQuerySucceeds("INSERT INTO showstatsfilters VALUES 1, 2, 3, 4, 5, 6, 7, 7, 7, 7");
assertQuerySucceeds("ANALYZE showstatsfilters");

MaterializedResult statsTable = getQueryRunner().execute("SHOW STATS for showstatsfilters");
MaterializedRow columnStats = statsTable.getMaterializedRows().get(0);
assertEquals(columnStats.getField(2), 7.0); // ndvs;
assertEquals(columnStats.getField(3), 0.0); // nulls
assertEquals(columnStats.getField(5), "1"); // min
assertEquals(columnStats.getField(6), "7"); // max

// EQ
statsTable = getQueryRunner().execute("SHOW STATS for (SELECT * FROM showstatsfilters WHERE i = 7)");
columnStats = statsTable.getMaterializedRows().get(0);
assertEquals(columnStats.getField(5), "7"); // min
assertEquals(columnStats.getField(6), "7"); // max
assertEquals(columnStats.getField(3), 0.0); // nulls
assertEquals((double) columnStats.getField(2), 7.0d * (4.0d / 10.0d), 1E-8); // ndvs;

// LT
statsTable = getQueryRunner().execute("SHOW STATS for (SELECT * FROM showstatsfilters WHERE i < 7)");
columnStats = statsTable.getMaterializedRows().get(0);
assertEquals(columnStats.getField(5), "1"); // min
assertEquals(columnStats.getField(6), "6"); // max
assertEquals(columnStats.getField(3), 0.0); // nulls
assertEquals((double) columnStats.getField(2), 7.0d * (6.0d / 10.0d), 1E-8); // ndvs;

// LTE
statsTable = getQueryRunner().execute("SHOW STATS for (SELECT * FROM showstatsfilters WHERE i <= 7)");
columnStats = statsTable.getMaterializedRows().get(0);
assertEquals(columnStats.getField(5), "1"); // min
assertEquals(columnStats.getField(6), "7"); // max
assertEquals(columnStats.getField(3), 0.0); // nulls
assertEquals(columnStats.getField(2), 7.0d); // ndvs;

// GT
statsTable = getQueryRunner().execute("SHOW STATS for (SELECT * FROM showstatsfilters WHERE i > 7)");
columnStats = statsTable.getMaterializedRows().get(0);
assertEquals(columnStats.getField(5), null); // min
assertEquals(columnStats.getField(6), null); // max
assertEquals(columnStats.getField(3), null); // nulls
assertEquals(columnStats.getField(2), null); // ndvs;

// GTE
statsTable = getQueryRunner().execute("SHOW STATS for (SELECT * FROM showstatsfilters WHERE i >= 7)");
columnStats = statsTable.getMaterializedRows().get(0);
assertEquals(columnStats.getField(5), "7"); // min
assertEquals(columnStats.getField(6), "7"); // max
assertEquals(columnStats.getField(3), 0.0); // nulls
assertEquals((double) columnStats.getField(2), 7.0d * (4.0d / 10.0d), 1E-8); // ndvs;

assertQuerySucceeds("DROP TABLE showstatsfilters");
}

private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List<FileContent> expectedFileContent)
{
// check delete file list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public void testStatsByDistance()
// ignore because HMS doesn't support statistics versioning
}

@Override
public void testPartShowStatsWithFilters()
{
// Hive doesn't support returning statistics on partitioned tables
}

@Override
protected Table loadTable(String tableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,18 +154,33 @@ public Builder setNullsFraction(Estimate nullsFraction)
return this;
}

public Estimate getNullsFraction()
{
return nullsFraction;
}

public Builder setDistinctValuesCount(Estimate distinctValuesCount)
{
this.distinctValuesCount = requireNonNull(distinctValuesCount, "distinctValuesCount is null");
return this;
}

public Estimate getDistinctValuesCount()
{
return distinctValuesCount;
}

public Builder setDataSize(Estimate dataSize)
{
this.dataSize = requireNonNull(dataSize, "dataSize is null");
return this;
}

public Estimate getDataSize()
{
return dataSize;
}

public Builder setRange(DoubleRange range)
{
this.range = Optional.of(requireNonNull(range, "range is null"));
Expand Down