Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.hive.HiveSplit.BucketConversion;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.util.HiveFileIterator.NestedDirectoryNotAllowedException;
import com.facebook.presto.hive.util.InternalHiveSplitFactory;
Expand Down Expand Up @@ -279,12 +280,15 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
throws IOException
{
String partitionName = partition.getHivePartition().getPartitionId();
Properties schema = getPartitionSchema(table, partition.getPartition());
Storage storage = partition.getPartition().map(Partition::getStorage).orElse(table.getStorage());
int partitionDataColumnCount = partition.getPartition()
.map(p -> p.getColumns().size())
.orElse(table.getDataColumns().size());
List<HivePartitionKey> partitionKeys = getPartitionKeys(table, partition.getPartition());

Path path = new Path(getPartitionLocation(table, partition.getPartition()));
Configuration configuration = hdfsEnvironment.getConfiguration(hdfsContext, path);
InputFormat<?, ?> inputFormat = getInputFormat(configuration, schema, false);
InputFormat<?, ?> inputFormat = getInputFormat(configuration, storage.getStorageFormat().getInputFormat(), false);
FileSystem fs = hdfsEnvironment.getFileSystem(hdfsContext, path);
boolean s3SelectPushdownEnabled = shouldEnablePushdownForTable(session, table, path.toString(), partition.getPartition());

Expand Down Expand Up @@ -313,7 +317,7 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
pathDomain,
isForceLocalScheduling(session),
s3SelectPushdownEnabled,
new HiveSplitPartitionInfo(schema, path.toUri(), partitionKeys, partitionName, partition.getColumnCoercions(), Optional.empty()),
new HiveSplitPartitionInfo(storage, path.toUri(), partitionKeys, partitionName, partitionDataColumnCount, partition.getPartitionSchemaDifference(), Optional.empty()),
schedulerUsesHostAddresses);
lastResult = addSplitsToSource(targetSplits, splitFactory);
if (stopped) {
Expand Down Expand Up @@ -347,11 +351,12 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
isForceLocalScheduling(session),
s3SelectPushdownEnabled,
new HiveSplitPartitionInfo(
schema,
storage,
path.toUri(),
partitionKeys,
partitionName,
partition.getColumnCoercions(),
partitionDataColumnCount,
partition.getPartitionSchemaDifference(),
bucketConversionRequiresWorkerParticipation ? bucketConversion : Optional.empty()),
schedulerUsesHostAddresses);

Expand All @@ -370,6 +375,7 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)

// S3 Select pushdown works at the granularity of individual S3 objects,
// therefore we must not split files when it is enabled.
Properties schema = getHiveSchema(storage.getSerdeParameters(), table.getParameters());
Comment thread
arhimondr marked this conversation as resolved.
Outdated
boolean splittable = getHeaderCount(schema) == 0 && getFooterCount(schema) == 0 && !s3SelectPushdownEnabled;

// Bucketed partitions are fully loaded immediately since all files must be loaded to determine the file to bucket mapping
Expand Down Expand Up @@ -560,14 +566,6 @@ private static List<HivePartitionKey> getPartitionKeys(Table table, Optional<Par
return partitionKeys.build();
}

private static Properties getPartitionSchema(Table table, Optional<Partition> partition)
{
if (!partition.isPresent()) {
return getHiveSchema(table);
}
return getHiveSchema(partition.get(), table);
}

public static class BucketSplitInfo
{
private final List<HiveColumnHandle> bucketColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.hive;

import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.predicate.TupleDomain;
Expand All @@ -21,8 +22,8 @@
import org.joda.time.DateTimeZone;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

public interface HiveBatchPageSourceFactory
{
Expand All @@ -33,7 +34,8 @@ Optional<? extends ConnectorPageSource> createPageSource(
long start,
long length,
long fileSize,
Properties schema,
Storage storage,
Map<String, String> tableParameters,
List<HiveColumnHandle> columns,
TupleDomain<HiveColumnHandle> effectivePredicate,
DateTimeZone hiveStorageTimeZone);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@
import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Predicates.not;
import static com.google.common.base.Verify.verify;
Expand Down Expand Up @@ -1758,24 +1759,42 @@ public ConnectorPushdownFilterResult pushdownFilter(ConnectorSession session, Co
.map(HiveColumnHandle.class::cast)
.collect(toImmutableMap(HiveColumnHandle::getName, Functions.identity()));

SchemaTableName tableName = ((HiveTableHandle) tableHandle).getSchemaTableName();
return new ConnectorPushdownFilterResult(
getTableLayout(
session,
new HiveTableLayoutHandle(
((HiveTableHandle) tableHandle).getSchemaTableName(),
ImmutableList.copyOf(hivePartitionResult.getPartitionColumns()),
tableName,
hivePartitionResult.getPartitionColumns(),
Comment thread
arhimondr marked this conversation as resolved.
Outdated
// remove comments to optimize serialization costs
pruneColumnComments(hivePartitionResult.getDataColumns()),
hivePartitionResult.getTableParameters(),
hivePartitionResult.getPartitions(),
domainPredicate,
decomposedFilter.getRemainingExpression(),
predicateColumns,
hivePartitionResult.getEnforcedConstraint(),
hivePartitionResult.getBucketHandle(),
hivePartitionResult.getBucketFilter(),
session,
rowExpression -> rowExpressionService.formatRowExpression(session, rowExpression))),
createTableLayoutString(session, tableName, hivePartitionResult.getBucketHandle(), decomposedFilter.getRemainingExpression(), domainPredicate))),
TRUE_CONSTANT);
}

private String createTableLayoutString(
ConnectorSession session,
SchemaTableName tableName,
Optional<HiveBucketHandle> bucketHandle,
RowExpression remainingPredicate,
TupleDomain<Subfield> domainPredicate)
{
return toStringHelper(tableName.toString())
.omitNullValues()
.add("buckets", bucketHandle.map(HiveBucketHandle::getReadBucketCount).orElse(null))
.add("filter", TRUE_CONSTANT.equals(remainingPredicate) ? null : rowExpressionService.formatRowExpression(session, remainingPredicate))
.add("domains", domainPredicate.isAll() ? null : domainPredicate.toString(session))
.toString();
}

private static Set<VariableReferenceExpression> extractAll(RowExpression expression)
{
ImmutableSet.Builder<VariableReferenceExpression> builder = ImmutableSet.builder();
Expand Down Expand Up @@ -1817,21 +1836,24 @@ public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session
hiveBucketHandle = Optional.of(createVirtualBucketHandle(virtualBucketCount));
}

TupleDomain<Subfield> domainPredicate = hivePartitionResult.getEffectivePredicate().transform(HiveMetadata::toSubfield);
return ImmutableList.of(new ConnectorTableLayoutResult(
getTableLayout(
session,
new HiveTableLayoutHandle(
handle.getSchemaTableName(),
ImmutableList.copyOf(hivePartitionResult.getPartitionColumns()),
hivePartitionResult.getPartitionColumns(),
// remove comments to optimize serialization costs
pruneColumnComments(hivePartitionResult.getDataColumns()),
hivePartitionResult.getTableParameters(),
hivePartitionResult.getPartitions(),
hivePartitionResult.getEffectivePredicate().transform(HiveMetadata::toSubfield),
domainPredicate,
TRUE_CONSTANT,
predicateColumns,
hivePartitionResult.getEnforcedConstraint(),
hiveBucketHandle,
hivePartitionResult.getBucketFilter(),
session,
rowExpression -> rowExpressionService.formatRowExpression(session, rowExpression))),
createTableLayoutString(session, handle.getSchemaTableName(), hivePartitionResult.getBucketHandle(), TRUE_CONSTANT, domainPredicate))),
hivePartitionResult.getUnenforcedConstraint()));
}

Expand All @@ -1857,11 +1879,18 @@ private boolean isPushdownFilterEnabled(ConnectorSession session, ConnectorTable
return false;
}

private List<Column> pruneColumnComments(List<Column> columns)
{
return columns.stream()
.map(column -> new Column(column.getName(), column.getType(), Optional.empty()))
.collect(toImmutableList());
}

@Override
public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle layoutHandle)
{
HiveTableLayoutHandle hiveLayoutHandle = (HiveTableLayoutHandle) layoutHandle;
List<ColumnHandle> partitionColumns = hiveLayoutHandle.getPartitionColumns();
List<ColumnHandle> partitionColumns = ImmutableList.copyOf(hiveLayoutHandle.getPartitionColumns());
List<HivePartition> partitions = hiveLayoutHandle.getPartitions().get();

TupleDomain<ColumnHandle> predicate = createPredicate(partitionColumns, partitions);
Expand Down Expand Up @@ -1997,15 +2026,16 @@ public ConnectorTableLayoutHandle getAlternativeLayoutHandle(ConnectorSession se
return new HiveTableLayoutHandle(
hiveLayoutHandle.getSchemaTableName(),
hiveLayoutHandle.getPartitionColumns(),
hiveLayoutHandle.getDataColumns(),
hiveLayoutHandle.getTableParameters(),
hiveLayoutHandle.getPartitions().get(),
hiveLayoutHandle.getDomainPredicate(),
hiveLayoutHandle.getRemainingPredicate(),
hiveLayoutHandle.getPredicateColumns(),
hiveLayoutHandle.getPartitionColumnPredicate(),
Optional.of(new HiveBucketHandle(bucketHandle.getColumns(), bucketHandle.getTableBucketCount(), hivePartitioningHandle.getBucketCount())),
hiveLayoutHandle.getBucketFilter(),
session,
rowExpression -> rowExpressionService.formatRowExpression(session, rowExpression));
hiveLayoutHandle.getLayoutString());
}

@Override
Expand Down
Loading