Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.OptionalLong;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class HiveBasicStatistics
Expand Down Expand Up @@ -49,13 +48,9 @@ public HiveBasicStatistics(
OptionalLong onDiskDataSizeInBytes)
{
this.fileCount = requireNonNull(fileCount, "fileCount is null");
fileCount.ifPresent(count -> checkArgument(count >= 0, "fileCount is negative: %d", count));
this.rowCount = requireNonNull(rowCount, "rowCount is null");
rowCount.ifPresent(count -> checkArgument(count >= 0, "rowCount is negative: %d", count));
this.inMemoryDataSizeInBytes = requireNonNull(inMemoryDataSizeInBytes, "inMemoryDataSizeInBytes is null");
inMemoryDataSizeInBytes.ifPresent(size -> checkArgument(size >= 0, "inMemoryDataSizeInBytes is negative: %d", size));
this.onDiskDataSizeInBytes = requireNonNull(onDiskDataSizeInBytes, "onDiskDataSizeInBytes is null");
onDiskDataSizeInBytes.ifPresent(size -> checkArgument(size >= 0, "onDiskDataSizeInBytes is negative: %d", size));
}

public OptionalLong getFileCount()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public class HiveClientConfig

private boolean tableStatisticsEnabled = true;
private int partitionStatisticsSampleSize = 100;
private boolean ignoreCorruptedStatistics;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cmt msg

If corruption is detected and the session property is set to true,
the statistics provider logs the corruption details and returns empty statistics.

this isn't true yet, since no-one reads the new flag. Maybe squash this commit with the usage? Or amend the cmt msg to reflect this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just going to squash it

private boolean collectColumnStatisticsOnWrite;

public int getMaxInitialSplits()
Expand Down Expand Up @@ -1094,6 +1095,19 @@ public HiveClientConfig setPartitionStatisticsSampleSize(int partitionStatistics
return this;
}

public boolean isIgnoreCorruptedStatistics()
{
return ignoreCorruptedStatistics;
}

@Config("hive.ignore-corrupted-statistics")
@ConfigDescription("Ignore corrupted statistics rather than failing")
public HiveClientConfig setIgnoreCorruptedStatistics(boolean ignoreCorruptedStatistics)
{
this.ignoreCorruptedStatistics = ignoreCorruptedStatistics;
return this;
}

public boolean isCollectColumnStatisticsOnWrite()
{
return collectColumnStatisticsOnWrite;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public enum HiveErrorCode
HIVE_TABLE_NOT_READABLE(34, USER_ERROR),
HIVE_TABLE_DROPPED_DURING_QUERY(35, EXTERNAL),
// HIVE_TOO_MANY_BUCKET_SORT_FILES(36) is deprecated
HIVE_CORRUPTED_COLUMN_STATISTICS(37, EXTERNAL),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR;
import static com.facebook.presto.hive.HivePartitionManager.extractPartitionKeyValues;
import static com.facebook.presto.hive.HivePartitionManager.extractPartitionValues;
import static com.facebook.presto.hive.HiveSessionProperties.getHiveStorageFormat;
import static com.facebook.presto.hive.HiveSessionProperties.isBucketExecutionEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isCollectColumnStatisticsOnWrite;
Expand Down Expand Up @@ -180,7 +180,6 @@
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
import static com.facebook.presto.spi.predicate.TupleDomain.withColumnDomains;
import static com.facebook.presto.spi.statistics.TableStatistics.EMPTY_STATISTICS;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
Expand Down Expand Up @@ -525,14 +524,16 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint<ColumnHandle> constraint)
{
if (!isStatisticsEnabled(session)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explain in commit message what you change and motivation behind this

return EMPTY_STATISTICS;
return TableStatistics.empty();
}
List<HivePartition> hivePartitions = getPartitionsAsList(tableHandle, constraint);
Map<String, ColumnHandle> tableColumns = getColumnHandles(session, tableHandle)
Map<String, ColumnHandle> columns = getColumnHandles(session, tableHandle)
.entrySet().stream()
.filter(entry -> !((HiveColumnHandle) entry.getValue()).isHidden())
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
return hiveStatisticsProvider.getTableStatistics(session, tableHandle, hivePartitions, tableColumns);
Map<String, Type> columnTypes = columns.entrySet().stream()
.collect(toImmutableMap(Map.Entry::getKey, entry -> getColumnMetadata(session, tableHandle, entry.getValue()).getType()));
List<HivePartition> partitions = getPartitionsAsList(tableHandle, constraint);
return hiveStatisticsProvider.getTableStatistics(session, ((HiveTableHandle) tableHandle).getSchemaTableName(), columns, columnTypes, partitions);
}

private List<HivePartition> getPartitionsAsList(ConnectorTableHandle tableHandle, Constraint<ColumnHandle> constraint)
Expand Down Expand Up @@ -1272,7 +1273,7 @@ private Partition buildPartitionObject(ConnectorSession session, Table table, Pa
.setDatabaseName(table.getDatabaseName())
.setTableName(table.getTableName())
.setColumns(table.getDataColumns())
.setValues(extractPartitionKeyValues(partitionUpdate.getName()))
.setValues(extractPartitionValues(partitionUpdate.getName()))
.setParameters(ImmutableMap.<String, String>builder()
.put(PRESTO_VERSION_NAME, prestoVersion)
.put(PRESTO_QUERY_ID_NAME, session.getQueryId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public HiveMetadata get()
partitionUpdateCodec,
typeTranslator,
prestoVersion,
new MetastoreHiveStatisticsProvider(typeManager, metastore, timeZone),
new MetastoreHiveStatisticsProvider(metastore),
maxPartitions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,27 +181,22 @@ private Optional<HivePartition> parseValuesAndFilterPartition(
List<Type> partitionColumnTypes,
Constraint<ColumnHandle> constraint)
{
List<String> keys = extractPartitionKeyValues(partitionId);
HivePartition partition = parsePartition(tableName, partitionId, partitionColumns, partitionColumnTypes, timeZone);

Map<ColumnHandle, Domain> domains = constraint.getSummary().getDomains().get();
ImmutableMap.Builder<ColumnHandle, NullableValue> builder = ImmutableMap.builder();
for (int i = 0; i < partitionColumns.size(); i++) {
HiveColumnHandle column = partitionColumns.get(i);
NullableValue parsedValue = parsePartitionValue(partitionId, keys.get(i), partitionColumnTypes.get(i), timeZone);

for (HiveColumnHandle column : partitionColumns) {
NullableValue value = partition.getKeys().get(column);
Domain allowedDomain = domains.get(column);
if (allowedDomain != null && !allowedDomain.includesNullableValue(parsedValue.getValue())) {
if (allowedDomain != null && !allowedDomain.includesNullableValue(value.getValue())) {
return Optional.empty();
}
builder.put(column, parsedValue);
}
Map<ColumnHandle, NullableValue> values = builder.build();

if (constraint.predicate().isPresent() && !constraint.predicate().get().test(values)) {
if (constraint.predicate().isPresent() && !constraint.predicate().get().test(partition.getKeys())) {
return Optional.empty();
}

return Optional.of(new HivePartition(tableName, partitionId, values));
return Optional.of(partition);
}

private Table getTable(SemiTransactionalHiveMetastore metastore, SchemaTableName tableName)
Expand Down Expand Up @@ -282,7 +277,25 @@ else if (type instanceof TinyintType
.orElseThrow(() -> new TableNotFoundException(tableName));
}

public static List<String> extractPartitionKeyValues(String partitionName)
public static HivePartition parsePartition(
SchemaTableName tableName,
String partitionName,
List<HiveColumnHandle> partitionColumns,
List<Type> partitionColumnTypes,
DateTimeZone timeZone)
{
List<String> partitionValues = extractPartitionValues(partitionName);
ImmutableMap.Builder<ColumnHandle, NullableValue> builder = ImmutableMap.builder();
for (int i = 0; i < partitionColumns.size(); i++) {
HiveColumnHandle column = partitionColumns.get(i);
NullableValue parsedValue = parsePartitionValue(partitionName, partitionValues.get(i), partitionColumnTypes.get(i), timeZone);
builder.put(column, parsedValue);
}
Map<ColumnHandle, NullableValue> values = builder.build();
return new HivePartition(tableName, partitionName, values);
}

public static List<String> extractPartitionValues(String partitionName)
{
ImmutableList.Builder<String> values = ImmutableList.builder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public final class HiveSessionProperties
private static final String SORTED_WRITING_ENABLED = "sorted_writing_enabled";
private static final String STATISTICS_ENABLED = "statistics_enabled";
private static final String PARTITION_STATISTICS_SAMPLE_SIZE = "partition_statistics_sample_size";
private static final String IGNORE_CORRUPTED_STATISTICS = "ignore_corrupted_statistics";
private static final String COLLECT_COLUMN_STATISTICS_ON_WRITE = "collect_column_statistics_on_write";

private final List<PropertyMetadata<?>> sessionProperties;
Expand Down Expand Up @@ -256,6 +257,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
"Maximum sample size of the partitions column statistics",
hiveClientConfig.getPartitionStatisticsSampleSize(),
false),
booleanProperty(
IGNORE_CORRUPTED_STATISTICS,
"Experimental: Ignore corrupted statistics rather than failing",
hiveClientConfig.isIgnoreCorruptedStatistics(),
false),
booleanProperty(
COLLECT_COLUMN_STATISTICS_ON_WRITE,
"Experimental: Enables automatic column level statistics collection on write",
Expand Down Expand Up @@ -437,6 +443,11 @@ public static int getPartitionStatisticsSampleSize(ConnectorSession session)
return size;
}

public static boolean isIgnoreCorruptedStatistics(ConnectorSession session)
{
return session.getProperty(IGNORE_CORRUPTED_STATISTICS, Boolean.class);
}

public static boolean isCollectColumnStatisticsOnWrite(ConnectorSession session)
{
return session.getProperty(COLLECT_COLUMN_STATISTICS_ON_WRITE, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,32 @@ public String toString()
.add("columnStatistics", columnStatistics)
.toString();
}

public static Builder builder()
{
return new Builder();
}

public static class Builder
{
private HiveBasicStatistics basicStatistics = HiveBasicStatistics.createEmptyStatistics();
private Map<String, HiveColumnStatistics> columnStatistics = ImmutableMap.of();

public Builder setBasicStatistics(HiveBasicStatistics basicStatistics)
{
this.basicStatistics = requireNonNull(basicStatistics, "basicStatistics is null");
return this;
}

public Builder setColumnStatistics(Map<String, HiveColumnStatistics> columnStatistics)
{
this.columnStatistics = ImmutableMap.copyOf(requireNonNull(columnStatistics, "columnStatistics is null"));
return this;
}

public PartitionStatistics build()
{
return new PartitionStatistics(basicStatistics, columnStatistics);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static com.facebook.presto.hive.HiveErrorCode.HIVE_CORRUPTED_COLUMN_STATISTICS;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_METASTORE_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PATH_ALREADY_EXISTS;
Expand Down Expand Up @@ -1083,13 +1084,7 @@ private void prepareAlterPartition(HdfsContext context, PartitionAndMore partiti
format("The partition that this transaction modified was deleted in another transaction. %s %s", partition.getTableName(), partition.getValues()));
}
String partitionName = getPartitionName(partition.getDatabaseName(), partition.getTableName(), partition.getValues());
PartitionStatistics oldPartitionStatistics = delegate.getPartitionStatistics(partition.getDatabaseName(), partition.getTableName(), ImmutableSet.of(partitionName))
.get(partitionName);
if (oldPartitionStatistics == null) {
throw new PrestoException(
TRANSACTION_CONFLICT,
format("The partition that this transaction modified was deleted in another transaction. %s %s", partition.getTableName(), partition.getValues()));
}
PartitionStatistics oldPartitionStatistics = getExistingPartitionStatistics(partition, partitionName);
String oldPartitionLocation = oldPartition.get().getStorage().getLocation();
Path oldPartitionPath = new Path(oldPartitionLocation);

Expand Down Expand Up @@ -1135,6 +1130,32 @@ private void prepareAlterPartition(HdfsContext context, PartitionAndMore partiti
new PartitionWithStatistics(oldPartition.get(), partitionName, oldPartitionStatistics)));
}

private PartitionStatistics getExistingPartitionStatistics(Partition partition, String partitionName)
{
try {
PartitionStatistics statistics = delegate.getPartitionStatistics(partition.getDatabaseName(), partition.getTableName(), ImmutableSet.of(partitionName))
.get(partitionName);
if (statistics == null) {
throw new PrestoException(
TRANSACTION_CONFLICT,
format("The partition that this transaction modified was deleted in another transaction. %s %s", partition.getTableName(), partition.getValues()));
}
return statistics;
}
catch (PrestoException e) {
if (e.getErrorCode().equals(HIVE_CORRUPTED_COLUMN_STATISTICS.toErrorCode())) {
log.warn(
e,
"Corrupted statistics found when altering partition. Table: %s.%s. Partition: %s",
partition.getDatabaseName(),
partition.getTableName(),
partition.getValues());
return PartitionStatistics.empty();
}
throw e;
}
}

private void prepareAddPartition(HdfsContext context, PartitionAndMore partitionAndMore)
{
deleteOnly = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@

import static com.facebook.presto.hive.HiveErrorCode.HIVE_METASTORE_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_DROPPED_DURING_QUERY;
import static com.facebook.presto.hive.HivePartitionManager.extractPartitionKeyValues;
import static com.facebook.presto.hive.HivePartitionManager.extractPartitionValues;
import static com.facebook.presto.hive.HiveUtil.toPartitionValues;
import static com.facebook.presto.hive.metastore.Database.DEFAULT_DATABASE_NAME;
import static com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege.OWNERSHIP;
Expand Down Expand Up @@ -302,7 +302,7 @@ public synchronized Map<String, PartitionStatistics> getPartitionStatistics(Stri
Table table = getRequiredTable(databaseName, tableName);
ImmutableMap.Builder<String, PartitionStatistics> statistics = ImmutableMap.builder();
for (String partitionName : partitionNames) {
List<String> partitionValues = extractPartitionKeyValues(partitionName);
List<String> partitionValues = extractPartitionValues(partitionName);
Path partitionDirectory = getPartitionMetadataDirectory(table, ImmutableList.copyOf(partitionValues));
PartitionMetadata partitionMetadata = readSchemaFile("partition", partitionDirectory, partitionCodec)
.orElseThrow(() -> new PartitionNotFoundException(new SchemaTableName(databaseName, tableName), partitionValues));
Expand Down Expand Up @@ -352,7 +352,7 @@ public synchronized void updatePartitionStatistics(String databaseName, String t
PartitionStatistics updatedStatistics = update.apply(originalStatistics);

Table table = getRequiredTable(databaseName, tableName);
List<String> partitionValues = extractPartitionKeyValues(partitionName);
List<String> partitionValues = extractPartitionValues(partitionName);
Path partitionDirectory = getPartitionMetadataDirectory(table, partitionValues);
PartitionMetadata partitionMetadata = readSchemaFile("partition", partitionDirectory, partitionCodec)
.orElseThrow(() -> new PartitionNotFoundException(new SchemaTableName(databaseName, tableName), partitionValues));
Expand Down
Loading