Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HdfsEnvironment.HdfsContext;
import com.facebook.presto.hive.HiveBasicStatistics;
import com.facebook.presto.hive.PartitionOfflineException;
import com.facebook.presto.hive.TableOfflineException;
import com.facebook.presto.spi.ErrorCodeSupplier;
Expand All @@ -25,6 +26,8 @@
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.predicate.Domain;
import com.facebook.presto.spi.statistics.ColumnStatisticType;
import com.facebook.presto.spi.type.ArrayType;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.BooleanType;
import com.facebook.presto.spi.type.CharType;
Expand All @@ -33,7 +36,9 @@
import com.facebook.presto.spi.type.Decimals;
import com.facebook.presto.spi.type.DoubleType;
import com.facebook.presto.spi.type.IntegerType;
import com.facebook.presto.spi.type.MapType;
import com.facebook.presto.spi.type.RealType;
import com.facebook.presto.spi.type.RowType;
import com.facebook.presto.spi.type.SmallintType;
import com.facebook.presto.spi.type.StandardTypes;
import com.facebook.presto.spi.type.TimestampType;
Expand All @@ -43,6 +48,9 @@
import com.facebook.presto.spi.type.VarcharType;
import com.google.common.base.CharMatcher;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Longs;
import io.airlift.slice.Slice;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -55,6 +63,8 @@
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;

import javax.annotation.Nullable;

import java.io.IOException;
import java.math.BigInteger;
import java.sql.Date;
Expand All @@ -67,13 +77,34 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static com.facebook.presto.hive.MetastoreErrorCode.HIVE_FILESYSTEM_ERROR;
import static com.facebook.presto.hive.MetastoreErrorCode.HIVE_INVALID_PARTITION_VALUE;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.MAX_VALUE;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.MAX_VALUE_SIZE_IN_BYTES;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.MIN_VALUE;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_NON_NULL_VALUES;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_TRUE_VALUES;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
import static com.facebook.presto.spi.type.Chars.isCharType;
import static com.facebook.presto.spi.type.Chars.padSpaces;
import static com.facebook.presto.spi.type.DateType.DATE;
import static com.facebook.presto.spi.type.DoubleType.DOUBLE;
import static com.facebook.presto.spi.type.IntegerType.INTEGER;
import static com.facebook.presto.spi.type.RealType.REAL;
import static com.facebook.presto.spi.type.SmallintType.SMALLINT;
import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.spi.type.TinyintType.TINYINT;
import static com.facebook.presto.spi.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.spi.type.Varchars.isVarcharType;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Strings.padEnd;
Expand Down Expand Up @@ -108,7 +139,13 @@ public class MetastoreUtil
public static final String HIVE_DEFAULT_DYNAMIC_PARTITION = "__HIVE_DEFAULT_PARTITION__";
@SuppressWarnings("OctalInteger")
public static final FsPermission ALL_PERMISSIONS = new FsPermission((short) 0777);

private static final String PARTITION_VALUE_WILDCARD = "";
private static final String NUM_FILES = "numFiles";
private static final String NUM_ROWS = "numRows";
private static final String RAW_DATA_SIZE = "rawDataSize";
private static final String TOTAL_SIZE = "totalSize";
private static final Set<String> STATS_PROPERTIES = ImmutableSet.of(NUM_FILES, NUM_ROWS, RAW_DATA_SIZE, TOTAL_SIZE);

private MetastoreUtil()
{
Expand Down Expand Up @@ -693,4 +730,103 @@ else if (type instanceof TinyintType
}
return val;
}

/**
* Hive calculates NDV considering null as a distinct value
*/
public static OptionalLong fromMetastoreDistinctValuesCount(OptionalLong distinctValuesCount, OptionalLong nullsCount, OptionalLong rowCount)
{
if (distinctValuesCount.isPresent() && nullsCount.isPresent() && rowCount.isPresent()) {
return OptionalLong.of(fromMetastoreDistinctValuesCount(distinctValuesCount.getAsLong(), nullsCount.getAsLong(), rowCount.getAsLong()));
}
return OptionalLong.empty();
}

public static long fromMetastoreDistinctValuesCount(long distinctValuesCount, long nullsCount, long rowCount)
{
long nonNullsCount = rowCount - nullsCount;
if (nullsCount > 0 && distinctValuesCount > 0) {
distinctValuesCount--;
}

// normalize distinctValuesCount in case there is a non null element
if (nonNullsCount > 0 && distinctValuesCount == 0) {
distinctValuesCount = 1;
}

// the metastore may store an estimate, so the value stored may be higher than the total number of rows
if (distinctValuesCount > nonNullsCount) {
return nonNullsCount;
}
return distinctValuesCount;
}

public static boolean isNumericType(Type type)
{
return type.equals(BIGINT) || type.equals(INTEGER) || type.equals(SMALLINT) || type.equals(TINYINT) ||
type.equals(DOUBLE) || type.equals(REAL) ||
type instanceof DecimalType;
}

public static Set<ColumnStatisticType> getSupportedColumnStatistics(Type type)
{
if (type.equals(BOOLEAN)) {
return ImmutableSet.of(NUMBER_OF_NON_NULL_VALUES, NUMBER_OF_TRUE_VALUES);
}
if (isNumericType(type) || type.equals(DATE) || type.equals(TIMESTAMP)) {
// TODO #7122 support non-legacy TIMESTAMP
return ImmutableSet.of(MIN_VALUE, MAX_VALUE, NUMBER_OF_DISTINCT_VALUES, NUMBER_OF_NON_NULL_VALUES);
}
if (isVarcharType(type) || isCharType(type)) {
// TODO Collect MIN,MAX once it is used by the optimizer
return ImmutableSet.of(NUMBER_OF_NON_NULL_VALUES, NUMBER_OF_DISTINCT_VALUES, TOTAL_SIZE_IN_BYTES, MAX_VALUE_SIZE_IN_BYTES);
}
if (type.equals(VARBINARY)) {
return ImmutableSet.of(NUMBER_OF_NON_NULL_VALUES, TOTAL_SIZE_IN_BYTES, MAX_VALUE_SIZE_IN_BYTES);
}
if (type instanceof ArrayType || type instanceof RowType || type instanceof MapType) {
return ImmutableSet.of(NUMBER_OF_NON_NULL_VALUES, TOTAL_SIZE_IN_BYTES);
}
// Throwing here to make sure this method is updated when a new type is added in Hive connector
throw new IllegalArgumentException("Unsupported type: " + type);
}

public static HiveBasicStatistics getHiveBasicStatistics(Map<String, String> parameters)
{
OptionalLong numFiles = parse(parameters.get(NUM_FILES));
OptionalLong numRows = parse(parameters.get(NUM_ROWS));
OptionalLong inMemoryDataSizeInBytes = parse(parameters.get(RAW_DATA_SIZE));
OptionalLong onDiskDataSizeInBytes = parse(parameters.get(TOTAL_SIZE));
return new HiveBasicStatistics(numFiles, numRows, inMemoryDataSizeInBytes, onDiskDataSizeInBytes);
}

private static OptionalLong parse(@Nullable String parameterValue)
{
if (parameterValue == null) {
return OptionalLong.empty();
}
Long longValue = Longs.tryParse(parameterValue);
if (longValue == null || longValue < 0) {
return OptionalLong.empty();
}
return OptionalLong.of(longValue);
}

public static Map<String, String> updateStatisticsParameters(Map<String, String> parameters, HiveBasicStatistics statistics)
{
ImmutableMap.Builder<String, String> result = ImmutableMap.builder();

parameters.forEach((key, value) -> {
if (!STATS_PROPERTIES.contains(key)) {
result.put(key, value);
}
});

statistics.getFileCount().ifPresent(count -> result.put(NUM_FILES, Long.toString(count)));
statistics.getRowCount().ifPresent(count -> result.put(NUM_ROWS, Long.toString(count)));
statistics.getInMemoryDataSizeInBytes().ifPresent(size -> result.put(RAW_DATA_SIZE, Long.toString(size)));
statistics.getOnDiskDataSizeInBytes().ifPresent(size -> result.put(TOTAL_SIZE, Long.toString(size)));

return result.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,27 @@

import alluxio.client.table.TableMasterClient;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.grpc.table.ColumnStatisticsInfo;
import alluxio.grpc.table.Constraint;
import alluxio.grpc.table.layout.hive.PartitionInfo;
import com.facebook.presto.hive.HiveBasicStatistics;
import com.facebook.presto.hive.HiveType;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.Database;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.HiveColumnStatistics;
import com.facebook.presto.hive.metastore.HivePrivilegeInfo;
import com.facebook.presto.hive.metastore.MetastoreUtil;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.PartitionStatistics;
import com.facebook.presto.hive.metastore.PartitionWithStatistics;
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.metastore.thrift.HiveMetastore;
import com.facebook.presto.hive.metastore.thrift.ThriftMetastoreUtil;
import com.facebook.presto.spi.NotFoundException;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.predicate.Domain;
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.facebook.presto.spi.security.RoleGrant;
Expand All @@ -45,15 +49,16 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Function;

import static com.facebook.presto.hive.MetastoreErrorCode.HIVE_METASTORE_ERROR;
import static com.facebook.presto.hive.metastore.MetastoreUtil.convertPredicateToParts;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveBasicStatistics;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;

/**
* Implementation of the {@link HiveMetastore} interface through Alluxio.
Expand Down Expand Up @@ -108,19 +113,26 @@ public Optional<Table> getTable(String databaseName, String tableName)
@Override
public Set<ColumnStatisticType> getSupportedColumnStatistics(Type type)
{
throw new UnsupportedOperationException("getSupportedColumnStatistics is not supported in AlluxioHiveMetastore");
return MetastoreUtil.getSupportedColumnStatistics(type);
}

private Map<String, HiveColumnStatistics> groupStatisticsByColumn(List<ColumnStatisticsInfo> statistics, OptionalLong rowCount)
{
return statistics.stream()
.collect(toImmutableMap(ColumnStatisticsInfo::getColName, statisticsInfo -> AlluxioProtoUtils.fromProto(statisticsInfo.getData(), rowCount)));
}

@Override
public PartitionStatistics getTableStatistics(String databaseName, String tableName)
{
try {
Table table = getTable(databaseName, tableName).orElseThrow(() -> new PrestoException(
HIVE_METASTORE_ERROR,
String.format("Could not retrieve table %s.%s", databaseName, tableName)));
HiveBasicStatistics basicStats = ThriftMetastoreUtil.getHiveBasicStatistics(table.getParameters());
// TODO implement logic to populate Map<string, HiveColumnStatistics>
return new PartitionStatistics(basicStats, ImmutableMap.of());
Table table = getTable(databaseName, tableName).orElseThrow(
() -> new PrestoException(HIVE_METASTORE_ERROR, String.format("Could not retrieve table %s.%s", databaseName, tableName)));
HiveBasicStatistics basicStatistics = getHiveBasicStatistics(table.getParameters());
List<Column> columns = table.getPartitionColumns();
List<String> columnNames = columns.stream().map(Column::getName).collect(toImmutableList());
List<ColumnStatisticsInfo> columnStatistics = client.getTableColumnStatistics(table.getDatabaseName(), table.getTableName(), columnNames);
return new PartitionStatistics(basicStatistics, groupStatisticsByColumn(columnStatistics, basicStatistics.getRowCount()));
}
catch (Exception e) {
throw new PrestoException(HIVE_METASTORE_ERROR, e);
Expand All @@ -130,9 +142,45 @@ public PartitionStatistics getTableStatistics(String databaseName, String tableN
@Override
public Map<String, PartitionStatistics> getPartitionStatistics(String databaseName, String tableName, Set<String> partitionNames)
{
// TODO implement partition statistics
// currently returns a map of partitionName to empty statistics to satisfy presto requirements
return partitionNames.stream().collect(toImmutableMap(identity(), (p) -> PartitionStatistics.empty()));
Table table = getTable(databaseName, tableName).orElseThrow(() -> new TableNotFoundException(new SchemaTableName(databaseName, tableName)));

Map<String, HiveBasicStatistics> partitionBasicStatistics = getPartitionsByNames(databaseName, tableName, ImmutableList.copyOf(partitionNames)).entrySet().stream()
.filter(entry -> entry.getValue().isPresent())
.collect(toImmutableMap(
entry -> MetastoreUtil.makePartName(table.getPartitionColumns(), entry.getValue().get().getValues()),
entry -> getHiveBasicStatistics(entry.getValue().get().getParameters())));

Map<String, OptionalLong> partitionRowCounts = partitionBasicStatistics.entrySet().stream()
.collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().getRowCount()));

List<String> dataColumns = table.getDataColumns().stream()
.map(Column::getName)
.collect(toImmutableList());
Map<String, List<ColumnStatisticsInfo>> columnStatisticss;
try {
columnStatisticss = client.getPartitionColumnStatistics(
table.getDatabaseName(),
table.getTableName(),
partitionBasicStatistics.keySet().stream().collect(toImmutableList()),
dataColumns);
}
catch (AlluxioStatusException e) {
throw new PrestoException(HIVE_METASTORE_ERROR, e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break a line


Map<String, Map<String, HiveColumnStatistics>> partitionColumnStatistics = columnStatisticss.entrySet().stream()
.filter(entry -> !entry.getValue().isEmpty())
.collect(toImmutableMap(
Map.Entry::getKey,
entry -> groupStatisticsByColumn(entry.getValue(), partitionRowCounts.getOrDefault(entry.getKey(), OptionalLong.empty()))));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break line after this


ImmutableMap.Builder<String, PartitionStatistics> result = ImmutableMap.builder();
for (String partitionName : partitionBasicStatistics.keySet()) {
HiveBasicStatistics basicStatistics = partitionBasicStatistics.get(partitionName);
Map<String, HiveColumnStatistics> columnStatistics = partitionColumnStatistics.getOrDefault(partitionName, ImmutableMap.of());
result.put(partitionName, new PartitionStatistics(basicStatistics, columnStatistics));
}
return result.build();
}

@Override
Expand Down
Loading