From 1402ae9c492aa503c15bbe0f3286b7c26de8ce7a Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Wed, 25 Sep 2019 15:15:42 -0400 Subject: [PATCH 1/2] Refactor HiveTableLayoutHandle Generate table layout name outside of the HiveTableLayoutHandle constructor. --- .../facebook/presto/hive/HiveMetadata.java | 31 +++++++++++---- .../presto/hive/HiveTableLayoutHandle.java | 38 ++++++------------- .../presto/hive/AbstractTestHiveClient.java | 10 ++--- .../presto/hive/TestHivePageSink.java | 3 +- 4 files changed, 41 insertions(+), 41 deletions(-) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java index 976506ea5bbcf..9add5067ccba8 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java @@ -220,6 +220,7 @@ import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT; import static com.facebook.presto.spi.type.BigintType.BIGINT; import static com.google.common.base.MoreObjects.firstNonNull; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Predicates.not; import static com.google.common.base.Verify.verify; @@ -1758,11 +1759,12 @@ public ConnectorPushdownFilterResult pushdownFilter(ConnectorSession session, Co .map(HiveColumnHandle.class::cast) .collect(toImmutableMap(HiveColumnHandle::getName, Functions.identity())); + SchemaTableName tableName = ((HiveTableHandle) tableHandle).getSchemaTableName(); return new ConnectorPushdownFilterResult( getTableLayout( session, new HiveTableLayoutHandle( - ((HiveTableHandle) tableHandle).getSchemaTableName(), + tableName, ImmutableList.copyOf(hivePartitionResult.getPartitionColumns()), hivePartitionResult.getPartitions(), domainPredicate, @@ -1771,11 +1773,25 @@ public ConnectorPushdownFilterResult pushdownFilter(ConnectorSession session, Co hivePartitionResult.getEnforcedConstraint(), hivePartitionResult.getBucketHandle(), hivePartitionResult.getBucketFilter(), - session, - rowExpression -> rowExpressionService.formatRowExpression(session, rowExpression))), + createTableLayoutString(session, tableName, hivePartitionResult.getBucketHandle(), decomposedFilter.getRemainingExpression(), domainPredicate))), TRUE_CONSTANT); } + private String createTableLayoutString( + ConnectorSession session, + SchemaTableName tableName, + Optional bucketHandle, + RowExpression remainingPredicate, + TupleDomain domainPredicate) + { + return toStringHelper(tableName.toString()) + .omitNullValues() + .add("buckets", bucketHandle.map(HiveBucketHandle::getReadBucketCount).orElse(null)) + .add("filter", TRUE_CONSTANT.equals(remainingPredicate) ? null : rowExpressionService.formatRowExpression(session, remainingPredicate)) + .add("domains", domainPredicate.isAll() ? null : domainPredicate.toString(session)) + .toString(); + } + private static Set extractAll(RowExpression expression) { ImmutableSet.Builder builder = ImmutableSet.builder(); @@ -1817,6 +1833,7 @@ public List getTableLayouts(ConnectorSession session hiveBucketHandle = Optional.of(createVirtualBucketHandle(virtualBucketCount)); } + TupleDomain domainPredicate = hivePartitionResult.getEffectivePredicate().transform(HiveMetadata::toSubfield); return ImmutableList.of(new ConnectorTableLayoutResult( getTableLayout( session, @@ -1824,14 +1841,13 @@ public List getTableLayouts(ConnectorSession session handle.getSchemaTableName(), ImmutableList.copyOf(hivePartitionResult.getPartitionColumns()), hivePartitionResult.getPartitions(), - hivePartitionResult.getEffectivePredicate().transform(HiveMetadata::toSubfield), + domainPredicate, TRUE_CONSTANT, predicateColumns, hivePartitionResult.getEnforcedConstraint(), hiveBucketHandle, hivePartitionResult.getBucketFilter(), - session, - rowExpression -> rowExpressionService.formatRowExpression(session, rowExpression))), + createTableLayoutString(session, handle.getSchemaTableName(), hivePartitionResult.getBucketHandle(), TRUE_CONSTANT, domainPredicate))), hivePartitionResult.getUnenforcedConstraint())); } @@ -2004,8 +2020,7 @@ public ConnectorTableLayoutHandle getAlternativeLayoutHandle(ConnectorSession se hiveLayoutHandle.getPartitionColumnPredicate(), Optional.of(new HiveBucketHandle(bucketHandle.getColumns(), bucketHandle.getTableBucketCount(), hivePartitioningHandle.getBucketCount())), hiveLayoutHandle.getBucketFilter(), - session, - rowExpression -> rowExpressionService.formatRowExpression(session, rowExpression)); + hiveLayoutHandle.getLayoutString()); } @Override diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveTableLayoutHandle.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveTableLayoutHandle.java index 3d1a4dc55da26..4af70f61c3f35 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveTableLayoutHandle.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveTableLayoutHandle.java @@ -15,7 +15,6 @@ import com.facebook.presto.hive.HiveBucketing.HiveBucketFilter; import com.facebook.presto.spi.ColumnHandle; -import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorTableLayoutHandle; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.Subfield; @@ -31,10 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Function; -import static com.facebook.presto.spi.relation.LogicalRowExpressions.TRUE_CONSTANT; -import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; public final class HiveTableLayoutHandle @@ -48,13 +44,12 @@ public final class HiveTableLayoutHandle private final TupleDomain partitionColumnPredicate; private final Optional bucketHandle; private final Optional bucketFilter; + private final String layoutString; // coordinator-only properties @Nullable private final List partitions; - private final String layoutString; - @JsonCreator public HiveTableLayoutHandle( @JsonProperty("schemaTableName") SchemaTableName schemaTableName, @@ -64,7 +59,8 @@ public HiveTableLayoutHandle( @JsonProperty("predicateColumns") Map predicateColumns, @JsonProperty("partitionColumnPredicate") TupleDomain partitionColumnPredicate, @JsonProperty("bucketHandle") Optional bucketHandle, - @JsonProperty("bucketFilter") Optional bucketFilter) + @JsonProperty("bucketFilter") Optional bucketFilter, + @JsonProperty("layoutString") String layoutString) { this.schemaTableName = requireNonNull(schemaTableName, "table is null"); this.partitionColumns = ImmutableList.copyOf(requireNonNull(partitionColumns, "partitionColumns is null")); @@ -75,12 +71,7 @@ public HiveTableLayoutHandle( this.partitions = null; this.bucketHandle = requireNonNull(bucketHandle, "bucketHandle is null"); this.bucketFilter = requireNonNull(bucketFilter, "bucketFilter is null"); - - // on a worker - layoutString = toStringHelper(schemaTableName.toString()) - .omitNullValues() - .add("buckets", bucketHandle.map(HiveBucketHandle::getReadBucketCount).orElse(null)) - .toString(); + this.layoutString = requireNonNull(layoutString, "layoutString is null"); } public HiveTableLayoutHandle( @@ -93,8 +84,7 @@ public HiveTableLayoutHandle( TupleDomain partitionColumnPredicate, Optional bucketHandle, Optional bucketFilter, - ConnectorSession session, - Function rowExpressionFormatter) + String layoutString) { this.schemaTableName = requireNonNull(schemaTableName, "table is null"); this.partitionColumns = ImmutableList.copyOf(requireNonNull(partitionColumns, "partitionColumns is null")); @@ -105,17 +95,7 @@ public HiveTableLayoutHandle( this.partitionColumnPredicate = requireNonNull(partitionColumnPredicate, "partitionColumnPredicate is null"); this.bucketHandle = requireNonNull(bucketHandle, "bucketHandle is null"); this.bucketFilter = requireNonNull(bucketFilter, "bucketFilter is null"); - - // on coordinator - requireNonNull(session, "session is null"); - requireNonNull(rowExpressionFormatter, "rowExpressionFormatter is null"); - - layoutString = toStringHelper(schemaTableName.toString()) - .omitNullValues() - .add("buckets", bucketHandle.map(HiveBucketHandle::getReadBucketCount).orElse(null)) - .add("filter", TRUE_CONSTANT.equals(remainingPredicate) ? null : rowExpressionFormatter.apply(remainingPredicate)) - .add("domains", domainPredicate.isAll() ? null : domainPredicate.toString(session)) - .toString(); + this.layoutString = requireNonNull(layoutString, "layoutString is null"); } @JsonProperty @@ -177,6 +157,12 @@ public Optional getBucketFilter() return bucketFilter; } + @JsonProperty + public String getLayoutString() + { + return layoutString; + } + @Override public String toString() { diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java index 4920c4190714c..35533c4dbb9ac 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java @@ -669,8 +669,7 @@ protected void setupHive(String connectorId, String databaseName, String timeZon TupleDomain.all(), Optional.empty(), Optional.empty(), - SESSION, - rowExpression -> ROW_EXPRESSION_SERVICE.formatRowExpression(SESSION, rowExpression)); + "layout"); dsColumn = new HiveColumnHandle("ds", HIVE_STRING, parseTypeSignature(StandardTypes.VARCHAR), -1, PARTITION_KEY, Optional.empty()); fileFormatColumn = new HiveColumnHandle("file_format", HIVE_STRING, parseTypeSignature(StandardTypes.VARCHAR), -1, PARTITION_KEY, Optional.empty()); @@ -714,7 +713,7 @@ protected void setupHive(String connectorId, String databaseName, String timeZon TupleDomain domainPredicate = tupleDomain.transform(HiveColumnHandle.class::cast) .transform(column -> new Subfield(column.getName(), ImmutableList.of())); tableLayout = new ConnectorTableLayout( - new HiveTableLayoutHandle(tablePartitionFormat, partitionColumns, partitions, domainPredicate, TRUE_CONSTANT, ImmutableMap.of(dsColumn.getName(), dsColumn), tupleDomain, Optional.empty(), Optional.empty(), SESSION, rowExpression -> ROW_EXPRESSION_SERVICE.formatRowExpression(SESSION, rowExpression)), + new HiveTableLayoutHandle(tablePartitionFormat, partitionColumns, partitions, domainPredicate, TRUE_CONSTANT, ImmutableMap.of(dsColumn.getName(), dsColumn), tupleDomain, Optional.empty(), Optional.empty(), "layout"), Optional.empty(), TupleDomain.withColumnDomains(ImmutableMap.of( dsColumn, Domain.create(ValueSet.ofRanges(Range.equal(createUnboundedVarcharType(), utf8Slice("2012-12-29"))), false), @@ -741,7 +740,7 @@ protected void setupHive(String connectorId, String databaseName, String timeZon dummyColumn, Domain.create(ValueSet.ofRanges(Range.equal(INTEGER, 4L)), false)))))), ImmutableList.of()); List unpartitionedPartitions = ImmutableList.of(new HivePartition(tableUnpartitioned)); - unpartitionedTableLayout = new ConnectorTableLayout(new HiveTableLayoutHandle(tableUnpartitioned, ImmutableList.of(), unpartitionedPartitions, TupleDomain.all(), TRUE_CONSTANT, ImmutableMap.of(), TupleDomain.all(), Optional.empty(), Optional.empty(), SESSION, rowExpression -> ROW_EXPRESSION_SERVICE.formatRowExpression(SESSION, rowExpression))); + unpartitionedTableLayout = new ConnectorTableLayout(new HiveTableLayoutHandle(tableUnpartitioned, ImmutableList.of(), unpartitionedPartitions, TupleDomain.all(), TRUE_CONSTANT, ImmutableMap.of(), TupleDomain.all(), Optional.empty(), Optional.empty(), "layout")); timeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(timeZoneId)); } @@ -1781,8 +1780,7 @@ private void doTestBucketedTableEvolutionWithDifferentReadCount(HiveStorageForma layoutHandle.getPartitionColumnPredicate(), Optional.of(new HiveBucketHandle(bucketHandle.getColumns(), bucketHandle.getTableBucketCount(), 2)), layoutHandle.getBucketFilter(), - session, - rowExpression -> ROW_EXPRESSION_SERVICE.formatRowExpression(session, rowExpression)); + "layout"); List splits = getAllSplits(session, transaction, modifiedReadBucketCountLayoutHandle); assertEquals(splits.size(), 16); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java index ad17b97579660..706ffbd6b7ea7 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java @@ -250,7 +250,8 @@ private static ConnectorPageSource createPageSource(HiveTransactionHandle transa ImmutableMap.of(), TupleDomain.all(), Optional.empty(), - Optional.empty()))); + Optional.empty(), + "layout"))); HivePageSourceProvider provider = new HivePageSourceProvider(config, createTestHdfsEnvironment(config), getDefaultHiveRecordCursorProvider(config), getDefaultHiveDataStreamFactories(config), ImmutableSet.of(), TYPE_MANAGER, ROW_EXPRESSION_SERVICE); return provider.createPageSource(transaction, getSession(config), split, tableHandle.getLayout().get(), ImmutableList.copyOf(getColumnHandles())); } From 5c822fa42bf019cf3b810e06f38002c2b7e47b81 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Wed, 25 Sep 2019 20:47:07 -0400 Subject: [PATCH 2/2] Remove schema from the HiveSplit --- .../hive/BackgroundHiveSplitLoader.java | 24 +++--- .../hive/HiveBatchPageSourceFactory.java | 6 +- .../facebook/presto/hive/HiveMetadata.java | 21 +++++- .../presto/hive/HivePageSourceProvider.java | 65 ++++++++++++---- .../presto/hive/HivePartitionManager.java | 35 ++++++++- .../presto/hive/HivePartitionMetadata.java | 11 +-- .../presto/hive/HivePartitionResult.java | 20 +++++ .../hive/HiveSelectivePageSourceFactory.java | 4 +- .../com/facebook/presto/hive/HiveSplit.java | 36 +++++---- .../presto/hive/HiveSplitManager.java | 27 +++++-- .../presto/hive/HiveSplitPartitionInfo.java | 41 ++++++---- .../facebook/presto/hive/HiveSplitSource.java | 6 +- .../presto/hive/HiveTableLayoutHandle.java | 32 +++++++- .../com/facebook/presto/hive/HiveType.java | 9 +++ .../com/facebook/presto/hive/HiveUtil.java | 5 +- .../presto/hive/InternalHiveSplit.java | 11 +-- .../presto/hive/metastore/Column.java | 12 +++ .../presto/hive/metastore/MetastoreUtil.java | 75 +++++++++++++++---- .../hive/orc/DwrfBatchPageSourceFactory.java | 9 ++- .../orc/DwrfSelectivePageSourceFactory.java | 7 +- .../hive/orc/OrcBatchPageSourceFactory.java | 9 ++- .../orc/OrcSelectivePageSourceFactory.java | 7 +- .../hive/parquet/ParquetPageSource.java | 3 - .../parquet/ParquetPageSourceFactory.java | 11 +-- .../hive/rcfile/RcFilePageSourceFactory.java | 14 ++-- .../presto/hive/AbstractTestHiveClient.java | 51 ++++++++++++- .../presto/hive/TestHiveFileFormats.java | 58 +++++++++----- .../presto/hive/TestHivePageSink.java | 25 ++++--- .../facebook/presto/hive/TestHiveSplit.java | 23 +++--- .../presto/hive/TestHiveSplitSource.java | 28 ++++--- .../TestOrcBatchPageSourceMemoryTracking.java | 32 ++++---- .../presto/hive/benchmark/FileFormat.java | 10 ++- .../hive/metastore/TestMetastoreUtil.java | 30 ++++++++ 33 files changed, 542 insertions(+), 215 deletions(-) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java b/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java index 2f8863dfd15c8..f2e8c6a719cc9 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java @@ -18,6 +18,7 @@ import com.facebook.presto.hive.HiveSplit.BucketConversion; import com.facebook.presto.hive.metastore.Column; import com.facebook.presto.hive.metastore.Partition; +import com.facebook.presto.hive.metastore.Storage; import com.facebook.presto.hive.metastore.Table; import com.facebook.presto.hive.util.HiveFileIterator.NestedDirectoryNotAllowedException; import com.facebook.presto.hive.util.InternalHiveSplitFactory; @@ -279,12 +280,15 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition) throws IOException { String partitionName = partition.getHivePartition().getPartitionId(); - Properties schema = getPartitionSchema(table, partition.getPartition()); + Storage storage = partition.getPartition().map(Partition::getStorage).orElse(table.getStorage()); + int partitionDataColumnCount = partition.getPartition() + .map(p -> p.getColumns().size()) + .orElse(table.getDataColumns().size()); List partitionKeys = getPartitionKeys(table, partition.getPartition()); Path path = new Path(getPartitionLocation(table, partition.getPartition())); Configuration configuration = hdfsEnvironment.getConfiguration(hdfsContext, path); - InputFormat inputFormat = getInputFormat(configuration, schema, false); + InputFormat inputFormat = getInputFormat(configuration, storage.getStorageFormat().getInputFormat(), false); FileSystem fs = hdfsEnvironment.getFileSystem(hdfsContext, path); boolean s3SelectPushdownEnabled = shouldEnablePushdownForTable(session, table, path.toString(), partition.getPartition()); @@ -313,7 +317,7 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition) pathDomain, isForceLocalScheduling(session), s3SelectPushdownEnabled, - new HiveSplitPartitionInfo(schema, path.toUri(), partitionKeys, partitionName, partition.getColumnCoercions(), Optional.empty()), + new HiveSplitPartitionInfo(storage, path.toUri(), partitionKeys, partitionName, partitionDataColumnCount, partition.getPartitionSchemaDifference(), Optional.empty()), schedulerUsesHostAddresses); lastResult = addSplitsToSource(targetSplits, splitFactory); if (stopped) { @@ -347,11 +351,12 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition) isForceLocalScheduling(session), s3SelectPushdownEnabled, new HiveSplitPartitionInfo( - schema, + storage, path.toUri(), partitionKeys, partitionName, - partition.getColumnCoercions(), + partitionDataColumnCount, + partition.getPartitionSchemaDifference(), bucketConversionRequiresWorkerParticipation ? bucketConversion : Optional.empty()), schedulerUsesHostAddresses); @@ -370,6 +375,7 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition) // S3 Select pushdown works at the granularity of individual S3 objects, // therefore we must not split files when it is enabled. + Properties schema = getHiveSchema(storage.getSerdeParameters(), table.getParameters()); boolean splittable = getHeaderCount(schema) == 0 && getFooterCount(schema) == 0 && !s3SelectPushdownEnabled; // Bucketed partitions are fully loaded immediately since all files must be loaded to determine the file to bucket mapping @@ -560,14 +566,6 @@ private static List getPartitionKeys(Table table, Optional partition) - { - if (!partition.isPresent()) { - return getHiveSchema(table); - } - return getHiveSchema(partition.get(), table); - } - public static class BucketSplitInfo { private final List bucketColumns; diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveBatchPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveBatchPageSourceFactory.java index eb0f7f9541f79..bda7427f33c01 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveBatchPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveBatchPageSourceFactory.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.hive; +import com.facebook.presto.hive.metastore.Storage; import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.predicate.TupleDomain; @@ -21,8 +22,8 @@ import org.joda.time.DateTimeZone; import java.util.List; +import java.util.Map; import java.util.Optional; -import java.util.Properties; public interface HiveBatchPageSourceFactory { @@ -33,7 +34,8 @@ Optional createPageSource( long start, long length, long fileSize, - Properties schema, + Storage storage, + Map tableParameters, List columns, TupleDomain effectivePredicate, DateTimeZone hiveStorageTimeZone); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java index 9add5067ccba8..a4699adf5af3d 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java @@ -1765,7 +1765,10 @@ public ConnectorPushdownFilterResult pushdownFilter(ConnectorSession session, Co session, new HiveTableLayoutHandle( tableName, - ImmutableList.copyOf(hivePartitionResult.getPartitionColumns()), + hivePartitionResult.getPartitionColumns(), + // remove comments to optimize serialization costs + pruneColumnComments(hivePartitionResult.getDataColumns()), + hivePartitionResult.getTableParameters(), hivePartitionResult.getPartitions(), domainPredicate, decomposedFilter.getRemainingExpression(), @@ -1839,7 +1842,10 @@ public List getTableLayouts(ConnectorSession session session, new HiveTableLayoutHandle( handle.getSchemaTableName(), - ImmutableList.copyOf(hivePartitionResult.getPartitionColumns()), + hivePartitionResult.getPartitionColumns(), + // remove comments to optimize serialization costs + pruneColumnComments(hivePartitionResult.getDataColumns()), + hivePartitionResult.getTableParameters(), hivePartitionResult.getPartitions(), domainPredicate, TRUE_CONSTANT, @@ -1873,11 +1879,18 @@ private boolean isPushdownFilterEnabled(ConnectorSession session, ConnectorTable return false; } + private List pruneColumnComments(List columns) + { + return columns.stream() + .map(column -> new Column(column.getName(), column.getType(), Optional.empty())) + .collect(toImmutableList()); + } + @Override public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle layoutHandle) { HiveTableLayoutHandle hiveLayoutHandle = (HiveTableLayoutHandle) layoutHandle; - List partitionColumns = hiveLayoutHandle.getPartitionColumns(); + List partitionColumns = ImmutableList.copyOf(hiveLayoutHandle.getPartitionColumns()); List partitions = hiveLayoutHandle.getPartitions().get(); TupleDomain predicate = createPredicate(partitionColumns, partitions); @@ -2013,6 +2026,8 @@ public ConnectorTableLayoutHandle getAlternativeLayoutHandle(ConnectorSession se return new HiveTableLayoutHandle( hiveLayoutHandle.getSchemaTableName(), hiveLayoutHandle.getPartitionColumns(), + hiveLayoutHandle.getDataColumns(), + hiveLayoutHandle.getTableParameters(), hiveLayoutHandle.getPartitions().get(), hiveLayoutHandle.getDomainPredicate(), hiveLayoutHandle.getRemainingPredicate(), diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java index 76fc80951c29d..6fedff80ce223 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java @@ -15,6 +15,8 @@ import com.facebook.presto.hive.HdfsEnvironment.HdfsContext; import com.facebook.presto.hive.HiveSplit.BucketConversion; +import com.facebook.presto.hive.metastore.Column; +import com.facebook.presto.hive.metastore.Storage; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.ConnectorSession; @@ -22,6 +24,7 @@ import com.facebook.presto.spi.ConnectorTableLayoutHandle; import com.facebook.presto.spi.RecordCursor; import com.facebook.presto.spi.RecordPageSource; +import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.Subfield; import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; @@ -53,6 +56,8 @@ import static com.facebook.presto.hive.HivePageSourceProvider.ColumnMapping.toColumnHandles; import static com.facebook.presto.hive.HiveSessionProperties.isPushdownFilterEnabled; import static com.facebook.presto.hive.HiveUtil.getPrefilledColumnValue; +import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveSchema; +import static com.facebook.presto.hive.metastore.MetastoreUtil.reconstructPartitionSchema; import static com.facebook.presto.spi.relation.ExpressionOptimizer.Level.OPTIMIZED; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -99,7 +104,7 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti { HiveTableLayoutHandle hiveLayout = (HiveTableLayoutHandle) layout; - List hiveColumns = columns.stream() + List selectedColumns = columns.stream() .map(HiveColumnHandle.class::cast) .collect(toList()); @@ -109,7 +114,7 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti Configuration configuration = hdfsEnvironment.getConfiguration(new HdfsContext(session, hiveSplit.getDatabase(), hiveSplit.getTable()), path); if (isPushdownFilterEnabled(session)) { - return createSelectivePageSource(selectivePageSourceFactories, configuration, session, hiveSplit, hiveLayout, assignUniqueIndicesToPartitionColumns(hiveColumns), hiveStorageTimeZone, rowExpressionService); + return createSelectivePageSource(selectivePageSourceFactories, configuration, session, hiveSplit, hiveLayout, assignUniqueIndicesToPartitionColumns(selectedColumns), hiveStorageTimeZone, rowExpressionService); } Optional pageSource = createHivePageSource( @@ -122,15 +127,20 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti hiveSplit.getStart(), hiveSplit.getLength(), hiveSplit.getFileSize(), - hiveSplit.getSchema(), + hiveSplit.getStorage(), hiveLayout.getDomainPredicate() .transform(Subfield::getRootName) .transform(hiveLayout.getPredicateColumns()::get), - hiveColumns, + selectedColumns, hiveSplit.getPartitionKeys(), hiveStorageTimeZone, typeManager, - hiveSplit.getColumnCoercions(), + hiveLayout.getSchemaTableName(), + hiveLayout.getPartitionColumns(), + hiveLayout.getDataColumns(), + hiveLayout.getTableParameters(), + hiveSplit.getPartitionDataColumnCount(), + hiveSplit.getPartitionSchemaDifference(), hiveSplit.getBucketConversion(), hiveSplit.isS3SelectPushdownEnabled()); if (pageSource.isPresent()) { @@ -166,7 +176,7 @@ private static ConnectorPageSource createSelectivePageSource( split.getPartitionKeys(), allColumns, ImmutableList.of(), - split.getColumnCoercions(), // TODO Include predicateColumns + split.getPartitionSchemaDifference(), // TODO Include predicateColumns path, split.getTableBucketNumber()); @@ -191,7 +201,7 @@ private static ConnectorPageSource createSelectivePageSource( split.getStart(), split.getLength(), split.getFileSize(), - split.getSchema(), + split.getStorage(), toColumnHandles(columnMappings, true), prefilledValues, outputColumns, @@ -233,13 +243,18 @@ public static Optional createHivePageSource( long start, long length, long fileSize, - Properties schema, + Storage storage, TupleDomain effectivePredicate, List hiveColumns, List partitionKeys, DateTimeZone hiveStorageTimeZone, TypeManager typeManager, - Map columnCoercions, + SchemaTableName tableName, + List partitionKeyColumnHandles, + List tableDataColumns, + Map tableParameters, + int partitionDataColumnCount, + Map partitionSchemaDifference, Optional bucketConversion, boolean s3SelectPushdownEnabled) { @@ -247,7 +262,7 @@ public static Optional createHivePageSource( partitionKeys, hiveColumns, bucketConversion.map(BucketConversion::getBucketColumnHandles).orElse(ImmutableList.of()), - columnCoercions, + partitionSchemaDifference, path, tableBucketNumber); List regularAndInterimColumnMappings = ColumnMapping.extractRegularAndInterimColumnMappings(columnMappings); @@ -262,7 +277,8 @@ public static Optional createHivePageSource( start, length, fileSize, - schema, + storage, + tableParameters, toColumnHandles(regularAndInterimColumnMappings, true), effectivePredicate, hiveStorageTimeZone); @@ -281,6 +297,20 @@ public static Optional createHivePageSource( // GenericHiveRecordCursor will automatically do the coercion without HiveCoercionRecordCursor boolean doCoercion = !(provider instanceof GenericHiveRecordCursorProvider); + List partitionDataColumns = reconstructPartitionSchema(tableDataColumns, partitionDataColumnCount, partitionSchemaDifference); + List partitionKeyColumns = partitionKeyColumnHandles.stream() + .map(handle -> new Column(handle.getName(), handle.getHiveType(), handle.getComment())) + .collect(toImmutableList()); + + Properties schema = getHiveSchema( + storage, + partitionDataColumns, + tableDataColumns, + tableParameters, + tableName.getSchemaName(), + tableName.getTableName(), + partitionKeyColumns); + Optional cursor = provider.createRecordCursor( configuration, session, @@ -415,14 +445,14 @@ public Optional getCoercionFrom() /** * @param columns columns that need to be returned to engine * @param requiredInterimColumns columns that are needed for processing, but shouldn't be returned to engine (may overlaps with columns) - * @param columnCoercions map from hive column index to hive type + * @param partitionSchemaDifference map from hive column index to hive type * @param bucketNumber empty if table is not bucketed, a number within [0, # bucket in table) otherwise */ public static List buildColumnMappings( List partitionKeys, List columns, List requiredInterimColumns, - Map columnCoercions, + Map partitionSchemaDifference, Path path, OptionalInt bucketNumber) { @@ -431,7 +461,14 @@ public static List buildColumnMappings( Set regularColumnIndices = new HashSet<>(); ImmutableList.Builder columnMappings = ImmutableList.builder(); for (HiveColumnHandle column : columns) { - Optional coercionFrom = Optional.ofNullable(columnCoercions.get(column.getHiveColumnIndex())); + // will be present if the partition has a different schema (column type, column name) for the column + Optional partitionColumn = Optional.ofNullable(partitionSchemaDifference.get(column.getHiveColumnIndex())); + Optional coercionFrom = Optional.empty(); + // we don't care if only the column name has changed + if (partitionColumn.isPresent() && !partitionColumn.get().getType().equals(column.getHiveType())) { + coercionFrom = Optional.of(partitionColumn.get().getType()); + } + if (column.getColumnType() == REGULAR) { checkArgument(regularColumnIndices.add(column.getHiveColumnIndex()), "duplicate hiveColumnIndex in columns list"); columnMappings.add(regular(column, regularIndex, coercionFrom)); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HivePartitionManager.java b/presto-hive/src/main/java/com/facebook/presto/hive/HivePartitionManager.java index cb32616a18cb6..8707eb4095230 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HivePartitionManager.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HivePartitionManager.java @@ -169,7 +169,16 @@ && queryAccessesTooManyBuckets(hiveBucketHandle.get(), bucketFilter, partitions, } if (effectivePredicate.isNone()) { - return new HivePartitionResult(partitionColumns, partitions, TupleDomain.none(), TupleDomain.none(), TupleDomain.none(), hiveBucketHandle, Optional.empty()); + return new HivePartitionResult( + partitionColumns, + table.getDataColumns(), + table.getParameters(), + partitions, + TupleDomain.none(), + TupleDomain.none(), + TupleDomain.none(), + hiveBucketHandle, + Optional.empty()); } TupleDomain compactEffectivePredicate = effectivePredicate.compact(domainCompactionThreshold); @@ -177,6 +186,8 @@ && queryAccessesTooManyBuckets(hiveBucketHandle.get(), bucketFilter, partitions, if (partitionColumns.isEmpty()) { return new HivePartitionResult( partitionColumns, + table.getDataColumns(), + table.getParameters(), partitions, compactEffectivePredicate, effectivePredicate, @@ -188,7 +199,16 @@ && queryAccessesTooManyBuckets(hiveBucketHandle.get(), bucketFilter, partitions, // All partition key domains will be fully evaluated, so we don't need to include those TupleDomain remainingTupleDomain = TupleDomain.withColumnDomains(Maps.filterKeys(effectivePredicate.getDomains().get(), not(Predicates.in(partitionColumns)))); TupleDomain enforcedTupleDomain = TupleDomain.withColumnDomains(Maps.filterKeys(effectivePredicate.getDomains().get(), Predicates.in(partitionColumns))); - return new HivePartitionResult(partitionColumns, partitions, compactEffectivePredicate, remainingTupleDomain, enforcedTupleDomain, hiveBucketHandle, bucketFilter); + return new HivePartitionResult( + partitionColumns, + table.getDataColumns(), + table.getParameters(), + partitions, + compactEffectivePredicate, + remainingTupleDomain, + enforcedTupleDomain, + hiveBucketHandle, + bucketFilter); } private boolean queryUsesHiveBucketColumn(TupleDomain effectivePredicate) @@ -243,7 +263,16 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor .collect(toImmutableList()); Optional bucketHandle = shouldIgnoreTableBucketing(session) ? Optional.empty() : getHiveBucketHandle(table); - return new HivePartitionResult(partitionColumns, partitionList, TupleDomain.all(), TupleDomain.all(), TupleDomain.none(), bucketHandle, Optional.empty()); + return new HivePartitionResult( + partitionColumns, + table.getDataColumns(), + table.getParameters(), + partitionList, + TupleDomain.all(), + TupleDomain.all(), + TupleDomain.none(), + bucketHandle, + Optional.empty()); } private Optional parseValuesAndFilterPartition( diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HivePartitionMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HivePartitionMetadata.java index 40314d3e560d3..87299538d3b5d 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HivePartitionMetadata.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HivePartitionMetadata.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.hive; +import com.facebook.presto.hive.metastore.Column; import com.facebook.presto.hive.metastore.Partition; import java.util.Map; @@ -24,16 +25,16 @@ public class HivePartitionMetadata { private final Optional partition; private final HivePartition hivePartition; - private final Map columnCoercions; + private final Map partitionSchemaDifference; HivePartitionMetadata( HivePartition hivePartition, Optional partition, - Map columnCoercions) + Map partitionSchemaDifference) { this.partition = requireNonNull(partition, "partition is null"); this.hivePartition = requireNonNull(hivePartition, "hivePartition is null"); - this.columnCoercions = requireNonNull(columnCoercions, "columnCoercions is null"); + this.partitionSchemaDifference = requireNonNull(partitionSchemaDifference, "partitionSchemaDifference is null"); } public HivePartition getHivePartition() @@ -49,8 +50,8 @@ public Optional getPartition() return partition; } - public Map getColumnCoercions() + public Map getPartitionSchemaDifference() { - return columnCoercions; + return partitionSchemaDifference; } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HivePartitionResult.java b/presto-hive/src/main/java/com/facebook/presto/hive/HivePartitionResult.java index a824d14f83c6b..75076ef279055 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HivePartitionResult.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HivePartitionResult.java @@ -14,10 +14,14 @@ package com.facebook.presto.hive; import com.facebook.presto.hive.HiveBucketing.HiveBucketFilter; +import com.facebook.presto.hive.metastore.Column; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.predicate.TupleDomain; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import java.util.List; +import java.util.Map; import java.util.Optional; import static java.util.Objects.requireNonNull; @@ -33,6 +37,8 @@ public class HivePartitionResult { private final List partitionColumns; + private final List dataColumns; + private final Map tableParameters; private final List partitions; private final TupleDomain effectivePredicate; private final TupleDomain unenforcedConstraint; @@ -42,6 +48,8 @@ public class HivePartitionResult public HivePartitionResult( List partitionColumns, + List dataColumns, + Map tableParameters, List partitions, TupleDomain effectivePredicate, TupleDomain unenforcedConstraint, @@ -50,6 +58,8 @@ public HivePartitionResult( Optional bucketFilter) { this.partitionColumns = requireNonNull(partitionColumns, "partitionColumns is null"); + this.dataColumns = ImmutableList.copyOf(requireNonNull(dataColumns, "dataColumns is null")); + this.tableParameters = ImmutableMap.copyOf(requireNonNull(tableParameters, "tableProperties is null")); this.partitions = requireNonNull(partitions, "partitions is null"); this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null"); this.unenforcedConstraint = requireNonNull(unenforcedConstraint, "unenforcedConstraint is null"); @@ -63,6 +73,16 @@ public List getPartitionColumns() return partitionColumns; } + public List getDataColumns() + { + return dataColumns; + } + + public Map getTableParameters() + { + return tableParameters; + } + public List getPartitions() { return partitions; diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSelectivePageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSelectivePageSourceFactory.java index 34c70b77967d2..f449f807dbb0b 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSelectivePageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSelectivePageSourceFactory.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.hive; +import com.facebook.presto.hive.metastore.Storage; import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.Subfield; @@ -25,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Properties; public interface HiveSelectivePageSourceFactory { @@ -36,7 +36,7 @@ Optional createPageSource( long start, long length, long fileSize, - Properties schema, + Storage storage, List columns, Map prefilledValues, // key is hiveColumnIndex List outputColumns, // element is hiveColumnIndex diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplit.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplit.java index 3bc12dfd95cc3..93a13433762ea 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplit.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplit.java @@ -13,6 +13,8 @@ */ package com.facebook.presto.hive; +import com.facebook.presto.hive.metastore.Column; +import com.facebook.presto.hive.metastore.Storage; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.HostAddress; import com.fasterxml.jackson.annotation.JsonCreator; @@ -25,7 +27,6 @@ import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; -import java.util.Properties; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; @@ -38,7 +39,7 @@ public class HiveSplit private final long start; private final long length; private final long fileSize; - private final Properties schema; + private final Storage storage; private final List partitionKeys; private final List addresses; private final String database; @@ -47,7 +48,8 @@ public class HiveSplit private final OptionalInt readBucketNumber; private final OptionalInt tableBucketNumber; private final boolean forceLocalScheduling; - private final Map columnCoercions; // key: hiveColumnIndex + private final int partitionDataColumnCount; + private final Map partitionSchemaDifference; // key: hiveColumnIndex private final Optional bucketConversion; private final boolean s3SelectPushdownEnabled; @@ -60,13 +62,14 @@ public HiveSplit( @JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("fileSize") long fileSize, - @JsonProperty("schema") Properties schema, + @JsonProperty("storage") Storage storage, @JsonProperty("partitionKeys") List partitionKeys, @JsonProperty("addresses") List addresses, @JsonProperty("readBucketNumber") OptionalInt readBucketNumber, @JsonProperty("tableBucketNumber") OptionalInt tableBucketNumber, @JsonProperty("forceLocalScheduling") boolean forceLocalScheduling, - @JsonProperty("columnCoercions") Map columnCoercions, + @JsonProperty("partitionDataColumnCount") int partitionDataColumnCount, + @JsonProperty("partitionSchemaDifference") Map partitionSchemaDifference, @JsonProperty("bucketConversion") Optional bucketConversion, @JsonProperty("s3SelectPushdownEnabled") boolean s3SelectPushdownEnabled) { @@ -77,12 +80,12 @@ public HiveSplit( requireNonNull(table, "table is null"); requireNonNull(partitionName, "partitionName is null"); requireNonNull(path, "path is null"); - requireNonNull(schema, "schema is null"); + requireNonNull(storage, "storage is null"); requireNonNull(partitionKeys, "partitionKeys is null"); requireNonNull(addresses, "addresses is null"); requireNonNull(readBucketNumber, "readBucketNumber is null"); requireNonNull(tableBucketNumber, "tableBucketNumber is null"); - requireNonNull(columnCoercions, "columnCoercions is null"); + requireNonNull(partitionSchemaDifference, "partitionSchemaDifference is null"); requireNonNull(bucketConversion, "bucketConversion is null"); this.database = database; @@ -92,13 +95,14 @@ public HiveSplit( this.start = start; this.length = length; this.fileSize = fileSize; - this.schema = schema; + this.storage = storage; this.partitionKeys = ImmutableList.copyOf(partitionKeys); this.addresses = ImmutableList.copyOf(addresses); this.readBucketNumber = readBucketNumber; this.tableBucketNumber = tableBucketNumber; this.forceLocalScheduling = forceLocalScheduling; - this.columnCoercions = columnCoercions; + this.partitionDataColumnCount = partitionDataColumnCount; + this.partitionSchemaDifference = partitionSchemaDifference; this.bucketConversion = bucketConversion; this.s3SelectPushdownEnabled = s3SelectPushdownEnabled; } @@ -146,9 +150,9 @@ public long getFileSize() } @JsonProperty - public Properties getSchema() + public Storage getStorage() { - return schema; + return storage; } @JsonProperty @@ -183,9 +187,15 @@ public boolean isForceLocalScheduling() } @JsonProperty - public Map getColumnCoercions() + public int getPartitionDataColumnCount() { - return columnCoercions; + return partitionDataColumnCount; + } + + @JsonProperty + public Map getPartitionSchemaDifference() + { + return partitionSchemaDifference; } @JsonProperty diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java index 434158f573c8e..9549fb3bdd676 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java @@ -356,12 +356,18 @@ private Iterable getPartitionMetadata( if ((tableColumns == null) || (partitionColumns == null)) { throw new PrestoException(HIVE_INVALID_METADATA, format("Table '%s' or partition '%s' has null columns", tableName, partName)); } - ImmutableMap.Builder columnCoercions = ImmutableMap.builder(); - for (int i = 0; i < min(partitionColumns.size(), tableColumns.size()); i++) { + ImmutableMap.Builder partitionSchemaDifference = ImmutableMap.builder(); + for (int i = 0; i < partitionColumns.size(); i++) { + Column partitionColumn = partitionColumns.get(i); + + if (i >= tableColumns.size()) { + partitionSchemaDifference.put(i, partitionColumn); + continue; + } + HiveType tableType = tableColumns.get(i).getType(); - HiveType partitionType = partitionColumns.get(i).getType(); - if (!tableType.equals(partitionType)) { - if (!coercionPolicy.canCoerce(partitionType, tableType)) { + if (!tableType.equals(partitionColumn.getType())) { + if (!coercionPolicy.canCoerce(partitionColumn.getType(), tableType)) { throw new PrestoException(HIVE_PARTITION_SCHEMA_MISMATCH, format("" + "There is a mismatch between the table and partition schemas. " + "The types are incompatible and cannot be coerced. " + @@ -372,9 +378,14 @@ private Iterable getPartitionMetadata( tableType, partName, partitionColumns.get(i).getName(), - partitionType)); + partitionColumn.getType())); } - columnCoercions.put(i, partitionType.getHiveTypeName()); + partitionSchemaDifference.put(i, partitionColumn); + continue; + } + + if (!tableColumns.get(i).getName().equals(partitionColumn.getName())) { + partitionSchemaDifference.put(i, partitionColumn); } } @@ -402,7 +413,7 @@ private Iterable getPartitionMetadata( } } - results.add(new HivePartitionMetadata(hivePartition, Optional.of(partition), columnCoercions.build())); + results.add(new HivePartitionMetadata(hivePartition, Optional.of(partition), partitionSchemaDifference.build())); } return results.build(); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitPartitionInfo.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitPartitionInfo.java index e796e7b60ddac..328fecbcbeaf0 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitPartitionInfo.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitPartitionInfo.java @@ -14,6 +14,8 @@ package com.facebook.presto.hive; +import com.facebook.presto.hive.metastore.Column; +import com.facebook.presto.hive.metastore.Storage; import com.facebook.presto.spi.PrestoException; import org.openjdk.jol.info.ClassLayout; @@ -22,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; @@ -38,36 +39,39 @@ public class HiveSplitPartitionInfo private static final int INSTANCE_SIZE = ClassLayout.parseClass(HiveSplitPartitionInfo.class).instanceSize(); private static final int INTEGER_INSTANCE_SIZE = ClassLayout.parseClass(Integer.class).instanceSize(); - private final Properties schema; + private final Storage storage; private final URI path; private final List partitionKeys; private final String partitionName; - private final Map columnCoercions; + private final int partitionDataColumnCount; + private final Map partitionSchemaDifference; private final Optional bucketConversion; // keep track of how many InternalHiveSplits reference this PartitionInfo. private final AtomicInteger references = new AtomicInteger(0); public HiveSplitPartitionInfo( - Properties schema, + Storage storage, URI path, List partitionKeys, String partitionName, - Map columnCoercions, + int partitionDataColumnCount, + Map partitionSchemaDifference, Optional bucketConversion) { - requireNonNull(schema, "schema is null"); + requireNonNull(storage, "storage is null"); requireNonNull(path, "path is null"); requireNonNull(partitionKeys, "partitionKeys is null"); requireNonNull(partitionName, "partitionName is null"); - requireNonNull(columnCoercions, "columnCoersions is null"); + requireNonNull(partitionSchemaDifference, "partitionSchemaDifference is null"); requireNonNull(bucketConversion, "bucketConversion is null"); - this.schema = schema; + this.storage = storage; this.path = ensurePathHasTrailingSlash(path); this.partitionKeys = partitionKeys; this.partitionName = partitionName; - this.columnCoercions = columnCoercions; + this.partitionDataColumnCount = partitionDataColumnCount; + this.partitionSchemaDifference = partitionSchemaDifference; this.bucketConversion = bucketConversion; } @@ -90,9 +94,9 @@ private static URI ensurePathHasTrailingSlash(URI path) return path; } - public Properties getSchema() + public Storage getStorage() { - return schema; + return storage; } public List getPartitionKeys() @@ -105,9 +109,14 @@ public String getPartitionName() return partitionName; } - public Map getColumnCoercions() + public int getPartitionDataColumnCount() { - return columnCoercions; + return partitionDataColumnCount; + } + + public Map getPartitionSchemaDifference() + { + return partitionSchemaDifference; } public Optional getBucketConversion() @@ -124,9 +133,9 @@ public int getEstimatedSizeInBytes() } result += partitionName.length() * Character.BYTES; - result += sizeOfObjectArray(columnCoercions.size()); - for (HiveTypeName hiveTypeName : columnCoercions.values()) { - result += INTEGER_INSTANCE_SIZE + hiveTypeName.getEstimatedSizeInBytes(); + result += sizeOfObjectArray(partitionSchemaDifference.size()); + for (Column column : partitionSchemaDifference.values()) { + result += INTEGER_INSTANCE_SIZE + column.getEstimatedSizeInBytes(); } return result; } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSource.java index 9b695fdcc16a4..41c42f3e312ba 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSource.java @@ -61,7 +61,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.collect.Maps.transformValues; import static com.google.common.util.concurrent.Futures.immediateFuture; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.failedFuture; @@ -475,13 +474,14 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan internalSplit.getStart(), splitBytes, internalSplit.getFileSize(), - internalSplit.getSchema(), + internalSplit.getPartitionInfo().getStorage(), internalSplit.getPartitionKeys(), block.getAddresses(), internalSplit.getReadBucketNumber(), internalSplit.getTableBucketNumber(), internalSplit.isForceLocalScheduling(), - transformValues(internalSplit.getColumnCoercions(), HiveTypeName::toHiveType), + internalSplit.getPartitionInfo().getPartitionDataColumnCount(), + internalSplit.getPartitionSchemaDifference(), internalSplit.getBucketConversion(), internalSplit.isS3SelectPushdownEnabled())); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveTableLayoutHandle.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveTableLayoutHandle.java index 4af70f61c3f35..fc1b6a01f5afc 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveTableLayoutHandle.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveTableLayoutHandle.java @@ -14,6 +14,7 @@ package com.facebook.presto.hive; import com.facebook.presto.hive.HiveBucketing.HiveBucketFilter; +import com.facebook.presto.hive.metastore.Column; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorTableLayoutHandle; import com.facebook.presto.spi.SchemaTableName; @@ -24,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import javax.annotation.Nullable; @@ -37,7 +39,9 @@ public final class HiveTableLayoutHandle implements ConnectorTableLayoutHandle { private final SchemaTableName schemaTableName; - private final List partitionColumns; + private final List partitionColumns; + private final List dataColumns; + private final Map tableParameters; private final TupleDomain domainPredicate; private final RowExpression remainingPredicate; private final Map predicateColumns; @@ -53,7 +57,9 @@ public final class HiveTableLayoutHandle @JsonCreator public HiveTableLayoutHandle( @JsonProperty("schemaTableName") SchemaTableName schemaTableName, - @JsonProperty("partitionColumns") List partitionColumns, + @JsonProperty("partitionColumns") List partitionColumns, + @JsonProperty("dataColumns") List dataColumns, + @JsonProperty("tableParameters") Map tableParameters, @JsonProperty("domainPredicate") TupleDomain domainPredicate, @JsonProperty("remainingPredicate") RowExpression remainingPredicate, @JsonProperty("predicateColumns") Map predicateColumns, @@ -64,6 +70,8 @@ public HiveTableLayoutHandle( { this.schemaTableName = requireNonNull(schemaTableName, "table is null"); this.partitionColumns = ImmutableList.copyOf(requireNonNull(partitionColumns, "partitionColumns is null")); + this.dataColumns = ImmutableList.copyOf(requireNonNull(dataColumns, "dataColumns is null")); + this.tableParameters = ImmutableMap.copyOf(requireNonNull(tableParameters, "tableProperties is null")); this.domainPredicate = requireNonNull(domainPredicate, "domainPredicate is null"); this.remainingPredicate = requireNonNull(remainingPredicate, "remainingPredicate is null"); this.predicateColumns = requireNonNull(predicateColumns, "predicateColumns is null"); @@ -76,7 +84,9 @@ public HiveTableLayoutHandle( public HiveTableLayoutHandle( SchemaTableName schemaTableName, - List partitionColumns, + List partitionColumns, + List dataColumns, + Map tableParameters, List partitions, TupleDomain domainPredicate, RowExpression remainingPredicate, @@ -88,6 +98,8 @@ public HiveTableLayoutHandle( { this.schemaTableName = requireNonNull(schemaTableName, "table is null"); this.partitionColumns = ImmutableList.copyOf(requireNonNull(partitionColumns, "partitionColumns is null")); + this.dataColumns = ImmutableList.copyOf(requireNonNull(dataColumns, "dataColumns is null")); + this.tableParameters = ImmutableMap.copyOf(requireNonNull(tableParameters, "tableProperties is null")); this.partitions = requireNonNull(partitions, "partitions is null"); this.domainPredicate = requireNonNull(domainPredicate, "domainPredicate is null"); this.remainingPredicate = requireNonNull(remainingPredicate, "remainingPredicate is null"); @@ -105,11 +117,23 @@ public SchemaTableName getSchemaTableName() } @JsonProperty - public List getPartitionColumns() + public List getPartitionColumns() { return partitionColumns; } + @JsonProperty + public List getDataColumns() + { + return dataColumns; + } + + @JsonProperty + public Map getTableParameters() + { + return tableParameters; + } + /** * Partitions are dropped when HiveTableLayoutHandle is serialized. * diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveType.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveType.java index ea4fbc9cf99f0..935f131a69eb0 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveType.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveType.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Locale; @@ -72,6 +73,8 @@ public final class HiveType { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(HiveType.class).instanceSize(); + public static final HiveType HIVE_BOOLEAN = new HiveType(booleanTypeInfo); public static final HiveType HIVE_BYTE = new HiveType(byteTypeInfo); public static final HiveType HIVE_SHORT = new HiveType(shortTypeInfo); @@ -283,4 +286,10 @@ public static Type getPrimitiveType(PrimitiveTypeInfo typeInfo) return null; } } + + public int getEstimatedRetainedSizeInBytes() + { + // Size of TypeInfo is not accounted as TypeInfo's are cached and retained by the TypeInfoFactory + return INSTANCE_SIZE + hiveTypeName.getEstimatedSizeInBytes(); + } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java index c363ab2d1848e..d4d98c19ef88e 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java @@ -212,7 +212,7 @@ private HiveUtil() // Tell hive the columns we would like to read, this lets hive optimize reading column oriented files setReadColumns(configuration, readHiveColumnIndexes); - InputFormat inputFormat = getInputFormat(configuration, schema, true); + InputFormat inputFormat = getInputFormat(configuration, getInputFormatName(schema), true); JobConf jobConf = toJobConf(configuration); FileSplit fileSplit = new FileSplit(path, start, length, (String[]) null); @@ -285,9 +285,8 @@ public static Optional getCompressionCodec(TextInputFormat inp return Optional.ofNullable(compressionCodecFactory.getCodec(file)); } - static InputFormat getInputFormat(Configuration configuration, Properties schema, boolean symlinkTarget) + static InputFormat getInputFormat(Configuration configuration, String inputFormatName, boolean symlinkTarget) { - String inputFormatName = getInputFormatName(schema); try { JobConf jobConf = toJobConf(configuration); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/InternalHiveSplit.java b/presto-hive/src/main/java/com/facebook/presto/hive/InternalHiveSplit.java index d0e1bd2f050df..d603ec9df46a4 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/InternalHiveSplit.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/InternalHiveSplit.java @@ -14,6 +14,7 @@ package com.facebook.presto.hive; import com.facebook.presto.hive.HiveSplit.BucketConversion; +import com.facebook.presto.hive.metastore.Column; import com.facebook.presto.spi.HostAddress; import com.google.common.collect.ImmutableList; import org.apache.hadoop.fs.Path; @@ -25,7 +26,6 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalInt; -import java.util.Properties; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; @@ -137,11 +137,6 @@ public boolean isS3SelectPushdownEnabled() return s3SelectPushdownEnabled; } - public Properties getSchema() - { - return partitionInfo.getSchema(); - } - public List getPartitionKeys() { return partitionInfo.getPartitionKeys(); @@ -172,9 +167,9 @@ public boolean isForceLocalScheduling() return forceLocalScheduling; } - public Map getColumnCoercions() + public Map getPartitionSchemaDifference() { - return partitionInfo.getColumnCoercions(); + return partitionInfo.getPartitionSchemaDifference(); } public Optional getBucketConversion() diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/metastore/Column.java b/presto-hive/src/main/java/com/facebook/presto/hive/metastore/Column.java index 950554c8709c9..3e433ddd8beab 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/metastore/Column.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/metastore/Column.java @@ -16,6 +16,7 @@ import com.facebook.presto.hive.HiveType; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.openjdk.jol.info.ClassLayout; import javax.annotation.concurrent.Immutable; @@ -28,6 +29,8 @@ @Immutable public class Column { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(Column.class).instanceSize(); + private final String name; private final HiveType type; private final Optional comment; @@ -91,4 +94,13 @@ public int hashCode() { return Objects.hash(name, type, comment); } + + public int getEstimatedSizeInBytes() + { + int result = INSTANCE_SIZE; + result += name.length() * Character.BYTES; + result += type.getEstimatedRetainedSizeInBytes(); + result += comment.map(String::length).orElse(0); + return result; + } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/metastore/MetastoreUtil.java b/presto-hive/src/main/java/com/facebook/presto/hive/metastore/MetastoreUtil.java index 03c14365a86bb..8bb0e64bf7201 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/metastore/MetastoreUtil.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/metastore/MetastoreUtil.java @@ -20,6 +20,7 @@ import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.TableNotFoundException; +import com.google.common.collect.ImmutableList; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; @@ -86,11 +87,11 @@ public static Properties getHiveSchema(Partition partition, Table table) table.getPartitionColumns()); } - private static Properties getHiveSchema( - Storage sd, - List dataColumns, + public static Properties getHiveSchema( + Storage storage, + List partitionDataColumns, List tableDataColumns, - Map parameters, + Map tableParameters, String databaseName, String tableName, List partitionKeys) @@ -100,27 +101,27 @@ private static Properties getHiveSchema( Properties schema = new Properties(); - schema.setProperty(FILE_INPUT_FORMAT, sd.getStorageFormat().getInputFormat()); - schema.setProperty(FILE_OUTPUT_FORMAT, sd.getStorageFormat().getOutputFormat()); + schema.setProperty(FILE_INPUT_FORMAT, storage.getStorageFormat().getInputFormat()); + schema.setProperty(FILE_OUTPUT_FORMAT, storage.getStorageFormat().getOutputFormat()); schema.setProperty(META_TABLE_NAME, databaseName + "." + tableName); - schema.setProperty(META_TABLE_LOCATION, sd.getLocation()); + schema.setProperty(META_TABLE_LOCATION, storage.getLocation()); - if (sd.getBucketProperty().isPresent()) { - List bucketedBy = sd.getBucketProperty().get().getBucketedBy(); + if (storage.getBucketProperty().isPresent()) { + List bucketedBy = storage.getBucketProperty().get().getBucketedBy(); if (!bucketedBy.isEmpty()) { schema.setProperty(BUCKET_FIELD_NAME, bucketedBy.get(0)); } - schema.setProperty(BUCKET_COUNT, Integer.toString(sd.getBucketProperty().get().getBucketCount())); + schema.setProperty(BUCKET_COUNT, Integer.toString(storage.getBucketProperty().get().getBucketCount())); } else { schema.setProperty(BUCKET_COUNT, "0"); } - for (Map.Entry param : sd.getSerdeParameters().entrySet()) { + for (Map.Entry param : storage.getSerdeParameters().entrySet()) { schema.setProperty(param.getKey(), (param.getValue() != null) ? param.getValue() : ""); } - schema.setProperty(SERIALIZATION_LIB, sd.getStorageFormat().getSerDe()); + schema.setProperty(SERIALIZATION_LIB, storage.getStorageFormat().getSerDe()); StringBuilder columnNameBuilder = new StringBuilder(); StringBuilder columnTypeBuilder = new StringBuilder(); @@ -143,7 +144,7 @@ private static Properties getHiveSchema( schema.setProperty(META_TABLE_COLUMN_TYPES, columnTypes); schema.setProperty("columns.comments", columnCommentBuilder.toString()); - schema.setProperty(SERIALIZATION_DDL, toThriftDdl(tableName, dataColumns)); + schema.setProperty(SERIALIZATION_DDL, toThriftDdl(tableName, partitionDataColumns)); String partString = ""; String partStringSep = ""; @@ -164,8 +165,8 @@ private static Properties getHiveSchema( schema.setProperty(META_TABLE_PARTITION_COLUMN_TYPES, partTypesString); } - if (parameters != null) { - for (Map.Entry entry : parameters.entrySet()) { + if (tableParameters != null) { + for (Map.Entry entry : tableParameters.entrySet()) { // add non-null parameters to the schema if (entry.getValue() != null) { schema.setProperty(entry.getKey(), entry.getValue()); @@ -176,6 +177,50 @@ private static Properties getHiveSchema( return schema; } + public static Properties getHiveSchema(Map serdeParameters, Map tableParameters) + { + Properties schema = new Properties(); + for (Map.Entry param : serdeParameters.entrySet()) { + schema.setProperty(param.getKey(), (param.getValue() != null) ? param.getValue() : ""); + } + for (Map.Entry entry : tableParameters.entrySet()) { + // add non-null parameters to the schema + if (entry.getValue() != null) { + schema.setProperty(entry.getKey(), entry.getValue()); + } + } + return schema; + } + + /** + * Recreates partition schema based on the table schema and the column + * coercions map. + * + * partitionColumnCount is needed to handle cases when the partition + * has less columns than the table. + * + * If the partition has more columns than the table does, the partitionSchemaDifference + * map is expected to contain information for the missing columns. + */ + public static List reconstructPartitionSchema(List tableSchema, int partitionColumnCount, Map partitionSchemaDifference) + { + ImmutableList.Builder columns = ImmutableList.builder(); + for (int i = 0; i < partitionColumnCount; i++) { + Column column = partitionSchemaDifference.get(i); + if (column == null) { + checkArgument( + i < tableSchema.size(), + "column descriptor for column with hiveColumnIndex %s not found: tableSchema: %s, partitionSchemaDifference: %s", + i, + tableSchema, + partitionSchemaDifference); + column = tableSchema.get(i); + } + columns.add(column); + } + return columns.build(); + } + public static ProtectMode getProtectMode(Partition partition) { return getProtectMode(partition.getParameters()); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfBatchPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfBatchPageSourceFactory.java index 5098998c3ebd2..48311fe9688d6 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfBatchPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfBatchPageSourceFactory.java @@ -19,6 +19,7 @@ import com.facebook.presto.hive.HiveBatchPageSourceFactory; import com.facebook.presto.hive.HiveClientConfig; import com.facebook.presto.hive.HiveColumnHandle; +import com.facebook.presto.hive.metastore.Storage; import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.PrestoException; @@ -31,8 +32,8 @@ import javax.inject.Inject; import java.util.List; +import java.util.Map; import java.util.Optional; -import java.util.Properties; import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA; import static com.facebook.presto.hive.HiveSessionProperties.getOrcLazyReadSmallRanges; @@ -41,7 +42,6 @@ import static com.facebook.presto.hive.HiveSessionProperties.getOrcMaxReadBlockSize; import static com.facebook.presto.hive.HiveSessionProperties.getOrcStreamBufferSize; import static com.facebook.presto.hive.HiveSessionProperties.getOrcTinyStripeThreshold; -import static com.facebook.presto.hive.HiveUtil.isDeserializerClass; import static com.facebook.presto.hive.orc.OrcBatchPageSourceFactory.createOrcPageSource; import static com.facebook.presto.orc.OrcEncoding.DWRF; import static java.util.Objects.requireNonNull; @@ -70,12 +70,13 @@ public Optional createPageSource(Configuration co long start, long length, long fileSize, - Properties schema, + Storage storage, + Map tableParameters, List columns, TupleDomain effectivePredicate, DateTimeZone hiveStorageTimeZone) { - if (!isDeserializerClass(schema, OrcSerde.class)) { + if (!OrcSerde.class.getName().equals(storage.getStorageFormat().getSerDe())) { return Optional.empty(); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfSelectivePageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfSelectivePageSourceFactory.java index c8ecbc5facfbe..a66a65cb3d176 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfSelectivePageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfSelectivePageSourceFactory.java @@ -19,6 +19,7 @@ import com.facebook.presto.hive.HiveClientConfig; import com.facebook.presto.hive.HiveColumnHandle; import com.facebook.presto.hive.HiveSelectivePageSourceFactory; +import com.facebook.presto.hive.metastore.Storage; import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.PrestoException; @@ -37,10 +38,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Properties; import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA; -import static com.facebook.presto.hive.HiveUtil.isDeserializerClass; import static com.facebook.presto.hive.orc.OrcSelectivePageSourceFactory.createOrcPageSource; import static com.facebook.presto.orc.OrcEncoding.DWRF; import static java.util.Objects.requireNonNull; @@ -74,7 +73,7 @@ public Optional createPageSource( long start, long length, long fileSize, - Properties schema, + Storage storage, List columns, Map prefilledValues, List outputColumns, @@ -82,7 +81,7 @@ public Optional createPageSource( RowExpression remainingPredicate, DateTimeZone hiveStorageTimeZone) { - if (!isDeserializerClass(schema, OrcSerde.class)) { + if (!OrcSerde.class.getName().equals(storage.getStorageFormat().getSerDe())) { return Optional.empty(); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSourceFactory.java index f26622a8b41c3..32ef41d6c0114 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSourceFactory.java @@ -18,6 +18,7 @@ import com.facebook.presto.hive.HiveBatchPageSourceFactory; import com.facebook.presto.hive.HiveClientConfig; import com.facebook.presto.hive.HiveColumnHandle; +import com.facebook.presto.hive.metastore.Storage; import com.facebook.presto.memory.context.AggregatedMemoryContext; import com.facebook.presto.orc.OrcBatchRecordReader; import com.facebook.presto.orc.OrcDataSource; @@ -49,8 +50,8 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Optional; -import java.util.Properties; import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR; import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; @@ -63,7 +64,6 @@ import static com.facebook.presto.hive.HiveSessionProperties.getOrcTinyStripeThreshold; import static com.facebook.presto.hive.HiveSessionProperties.isOrcBloomFiltersEnabled; import static com.facebook.presto.hive.HiveUtil.getPhysicalHiveColumnHandles; -import static com.facebook.presto.hive.HiveUtil.isDeserializerClass; import static com.facebook.presto.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static com.facebook.presto.orc.OrcEncoding.ORC; import static com.facebook.presto.orc.OrcReader.INITIAL_BATCH_SIZE; @@ -104,12 +104,13 @@ public Optional createPageSource( long start, long length, long fileSize, - Properties schema, + Storage storage, + Map tableParameters, List columns, TupleDomain effectivePredicate, DateTimeZone hiveStorageTimeZone) { - if (!isDeserializerClass(schema, OrcSerde.class)) { + if (!OrcSerde.class.getName().equals(storage.getStorageFormat().getSerDe())) { return Optional.empty(); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSourceFactory.java index 738dc10721482..53283210d7d35 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSourceFactory.java @@ -19,6 +19,7 @@ import com.facebook.presto.hive.HiveColumnHandle; import com.facebook.presto.hive.HiveSelectivePageSourceFactory; import com.facebook.presto.hive.SubfieldExtractor; +import com.facebook.presto.hive.metastore.Storage; import com.facebook.presto.memory.context.AggregatedMemoryContext; import com.facebook.presto.orc.FilterFunction; import com.facebook.presto.orc.OrcDataSource; @@ -73,7 +74,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Properties; import java.util.Set; import java.util.function.Function; import java.util.stream.IntStream; @@ -89,7 +89,6 @@ import static com.facebook.presto.hive.HiveSessionProperties.getOrcTinyStripeThreshold; import static com.facebook.presto.hive.HiveSessionProperties.isOrcBloomFiltersEnabled; import static com.facebook.presto.hive.HiveUtil.getPhysicalHiveColumnHandles; -import static com.facebook.presto.hive.HiveUtil.isDeserializerClass; import static com.facebook.presto.hive.HiveUtil.typedPartitionKey; import static com.facebook.presto.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static com.facebook.presto.orc.OrcEncoding.ORC; @@ -144,7 +143,7 @@ public Optional createPageSource( long start, long length, long fileSize, - Properties schema, + Storage storage, List columns, Map prefilledValues, List outputColumns, @@ -152,7 +151,7 @@ public Optional createPageSource( RowExpression remainingPredicate, DateTimeZone hiveStorageTimeZone) { - if (!isDeserializerClass(schema, OrcSerde.class)) { + if (!OrcSerde.class.getName().equals(storage.getStorageFormat().getSerDe())) { return Optional.empty(); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java index c770bb0a607ad..6ab687f601c67 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java @@ -35,7 +35,6 @@ import java.io.UncheckedIOException; import java.util.List; import java.util.Optional; -import java.util.Properties; import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR; import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA; @@ -72,12 +71,10 @@ public ParquetPageSource( MessageType fileSchema, MessageColumnIO messageColumnIO, TypeManager typeManager, - Properties splitSchema, List columns, TupleDomain effectivePredicate, boolean useParquetColumnNames) { - requireNonNull(splitSchema, "splitSchema is null"); requireNonNull(columns, "columns is null"); requireNonNull(effectivePredicate, "effectivePredicate is null"); this.parquetReader = requireNonNull(parquetReader, "parquetReader is null"); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java index bb608b4810ab4..729e442352048 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java @@ -17,6 +17,7 @@ import com.facebook.presto.hive.HdfsEnvironment; import com.facebook.presto.hive.HiveBatchPageSourceFactory; import com.facebook.presto.hive.HiveColumnHandle; +import com.facebook.presto.hive.metastore.Storage; import com.facebook.presto.memory.context.AggregatedMemoryContext; import com.facebook.presto.parquet.ParquetCorruptionException; import com.facebook.presto.parquet.ParquetDataSource; @@ -59,7 +60,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.Properties; import java.util.Set; import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR; @@ -70,7 +70,6 @@ import static com.facebook.presto.hive.HiveSessionProperties.getParquetMaxReadBlockSize; import static com.facebook.presto.hive.HiveSessionProperties.isFailOnCorruptedParquetStatistics; import static com.facebook.presto.hive.HiveSessionProperties.isUseParquetColumnNames; -import static com.facebook.presto.hive.HiveUtil.getDeserializerClassName; import static com.facebook.presto.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource; import static com.facebook.presto.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static com.facebook.presto.parquet.ParquetTypeUtils.getColumnIO; @@ -127,12 +126,13 @@ public Optional createPageSource( long start, long length, long fileSize, - Properties schema, + Storage storage, + Map tableParameters, List columns, TupleDomain effectivePredicate, DateTimeZone hiveStorageTimeZone) { - if (!PARQUET_SERDE_CLASS_NAMES.contains(getDeserializerClassName(schema))) { + if (!PARQUET_SERDE_CLASS_NAMES.contains(storage.getStorageFormat().getSerDe())) { return Optional.empty(); } @@ -144,7 +144,6 @@ public Optional createPageSource( start, length, fileSize, - schema, columns, isUseParquetColumnNames(session), isFailOnCorruptedParquetStatistics(session), @@ -162,7 +161,6 @@ public static ParquetPageSource createParquetPageSource( long start, long length, long fileSize, - Properties schema, List columns, boolean useParquetColumnNames, boolean failOnCorruptedParquetStatistics, @@ -223,7 +221,6 @@ public static ParquetPageSource createParquetPageSource( fileSchema, messageColumnIO, typeManager, - schema, columns, effectivePredicate, useParquetColumnNames); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/rcfile/RcFilePageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/rcfile/RcFilePageSourceFactory.java index 8088d16f9478a..b7ba23b1190e8 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/rcfile/RcFilePageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/rcfile/RcFilePageSourceFactory.java @@ -17,6 +17,7 @@ import com.facebook.presto.hive.HdfsEnvironment; import com.facebook.presto.hive.HiveBatchPageSourceFactory; import com.facebook.presto.hive.HiveColumnHandle; +import com.facebook.presto.hive.metastore.Storage; import com.facebook.presto.rcfile.AircompressorCodecFactory; import com.facebook.presto.rcfile.HadoopCodecFactory; import com.facebook.presto.rcfile.RcFileCorruptionException; @@ -49,13 +50,14 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Properties; import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA; import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; import static com.facebook.presto.hive.HiveErrorCode.HIVE_MISSING_DATA; -import static com.facebook.presto.hive.HiveUtil.getDeserializerClassName; +import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveSchema; import static com.facebook.presto.rcfile.text.TextRcFileEncoding.DEFAULT_NULL_SEQUENCE; import static com.facebook.presto.rcfile.text.TextRcFileEncoding.DEFAULT_SEPARATORS; import static com.google.common.base.Strings.nullToEmpty; @@ -97,18 +99,18 @@ public Optional createPageSource( long start, long length, long fileSize, - Properties schema, + Storage storage, + Map tableParameters, List columns, TupleDomain effectivePredicate, DateTimeZone hiveStorageTimeZone) { RcFileEncoding rcFileEncoding; - String deserializerClassName = getDeserializerClassName(schema); - if (deserializerClassName.equals(LazyBinaryColumnarSerDe.class.getName())) { + if (LazyBinaryColumnarSerDe.class.getName().equals(storage.getStorageFormat().getSerDe())) { rcFileEncoding = new BinaryRcFileEncoding(); } - else if (deserializerClassName.equals(ColumnarSerDe.class.getName())) { - rcFileEncoding = createTextVectorEncoding(schema, hiveStorageTimeZone); + else if (ColumnarSerDe.class.getName().equals(storage.getStorageFormat().getSerDe())) { + rcFileEncoding = createTextVectorEncoding(getHiveSchema(storage.getSerdeParameters(), tableParameters), hiveStorageTimeZone); } else { return Optional.empty(); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java index 35533c4dbb9ac..5bd3f46644ef0 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java @@ -191,8 +191,13 @@ import static com.facebook.presto.hive.HiveTestUtils.getTypes; import static com.facebook.presto.hive.HiveTestUtils.mapType; import static com.facebook.presto.hive.HiveTestUtils.rowType; +import static com.facebook.presto.hive.HiveType.HIVE_BOOLEAN; +import static com.facebook.presto.hive.HiveType.HIVE_BYTE; +import static com.facebook.presto.hive.HiveType.HIVE_DOUBLE; +import static com.facebook.presto.hive.HiveType.HIVE_FLOAT; import static com.facebook.presto.hive.HiveType.HIVE_INT; import static com.facebook.presto.hive.HiveType.HIVE_LONG; +import static com.facebook.presto.hive.HiveType.HIVE_SHORT; import static com.facebook.presto.hive.HiveType.HIVE_STRING; import static com.facebook.presto.hive.HiveType.toHiveType; import static com.facebook.presto.hive.HiveUtil.columnExtraInfo; @@ -662,6 +667,8 @@ protected void setupHive(String connectorId, String databaseName, String timeZon invalidTableLayoutHandle = new HiveTableLayoutHandle( invalidTable, ImmutableList.of(), + ImmutableList.of(), + ImmutableMap.of(), ImmutableList.of(new HivePartition(invalidTable, "unknown", ImmutableMap.of())), TupleDomain.all(), TRUE_CONSTANT, @@ -677,7 +684,7 @@ protected void setupHive(String connectorId, String databaseName, String timeZon intColumn = new HiveColumnHandle("t_int", HIVE_INT, parseTypeSignature(StandardTypes.INTEGER), -1, PARTITION_KEY, Optional.empty()); invalidColumnHandle = new HiveColumnHandle(INVALID_COLUMN, HIVE_STRING, parseTypeSignature(StandardTypes.VARCHAR), 0, REGULAR, Optional.empty()); - List partitionColumns = ImmutableList.of(dsColumn, fileFormatColumn, dummyColumn); + List partitionColumns = ImmutableList.of(dsColumn, fileFormatColumn, dummyColumn); List partitions = ImmutableList.builder() .add(new HivePartition(tablePartitionFormat, "ds=2012-12-29/file_format=textfile/dummy=1", @@ -713,7 +720,27 @@ protected void setupHive(String connectorId, String databaseName, String timeZon TupleDomain domainPredicate = tupleDomain.transform(HiveColumnHandle.class::cast) .transform(column -> new Subfield(column.getName(), ImmutableList.of())); tableLayout = new ConnectorTableLayout( - new HiveTableLayoutHandle(tablePartitionFormat, partitionColumns, partitions, domainPredicate, TRUE_CONSTANT, ImmutableMap.of(dsColumn.getName(), dsColumn), tupleDomain, Optional.empty(), Optional.empty(), "layout"), + new HiveTableLayoutHandle( + tablePartitionFormat, + partitionColumns, + ImmutableList.of( + new Column("t_string", HIVE_STRING, Optional.empty()), + new Column("t_tinyint", HIVE_BYTE, Optional.empty()), + new Column("t_smallint", HIVE_SHORT, Optional.empty()), + new Column("t_int", HIVE_INT, Optional.empty()), + new Column("t_bigint", HIVE_LONG, Optional.empty()), + new Column("t_float", HIVE_FLOAT, Optional.empty()), + new Column("t_double", HIVE_DOUBLE, Optional.empty()), + new Column("t_boolean", HIVE_BOOLEAN, Optional.empty())), + ImmutableMap.of(), + partitions, + domainPredicate, + TRUE_CONSTANT, + ImmutableMap.of(dsColumn.getName(), dsColumn), + tupleDomain, + Optional.empty(), + Optional.empty(), + "layout"), Optional.empty(), TupleDomain.withColumnDomains(ImmutableMap.of( dsColumn, Domain.create(ValueSet.ofRanges(Range.equal(createUnboundedVarcharType(), utf8Slice("2012-12-29"))), false), @@ -721,7 +748,7 @@ protected void setupHive(String connectorId, String databaseName, String timeZon dummyColumn, Domain.create(ValueSet.ofRanges(Range.equal(INTEGER, 1L), Range.equal(INTEGER, 2L), Range.equal(INTEGER, 3L), Range.equal(INTEGER, 4L)), false))), Optional.empty(), Optional.empty(), - Optional.of(new DiscretePredicates(partitionColumns, ImmutableList.of( + Optional.of(new DiscretePredicates(ImmutableList.copyOf(partitionColumns), ImmutableList.of( TupleDomain.withColumnDomains(ImmutableMap.of( dsColumn, Domain.create(ValueSet.ofRanges(Range.equal(createUnboundedVarcharType(), utf8Slice("2012-12-29"))), false), fileFormatColumn, Domain.create(ValueSet.ofRanges(Range.equal(createUnboundedVarcharType(), utf8Slice("textfile"))), false), @@ -740,7 +767,21 @@ protected void setupHive(String connectorId, String databaseName, String timeZon dummyColumn, Domain.create(ValueSet.ofRanges(Range.equal(INTEGER, 4L)), false)))))), ImmutableList.of()); List unpartitionedPartitions = ImmutableList.of(new HivePartition(tableUnpartitioned)); - unpartitionedTableLayout = new ConnectorTableLayout(new HiveTableLayoutHandle(tableUnpartitioned, ImmutableList.of(), unpartitionedPartitions, TupleDomain.all(), TRUE_CONSTANT, ImmutableMap.of(), TupleDomain.all(), Optional.empty(), Optional.empty(), "layout")); + unpartitionedTableLayout = new ConnectorTableLayout(new HiveTableLayoutHandle( + tableUnpartitioned, + ImmutableList.of(), + ImmutableList.of( + new Column("t_string", HIVE_STRING, Optional.empty()), + new Column("t_tinyint", HIVE_BYTE, Optional.empty())), + ImmutableMap.of(), + unpartitionedPartitions, + TupleDomain.all(), + TRUE_CONSTANT, + ImmutableMap.of(), + TupleDomain.all(), + Optional.empty(), + Optional.empty(), + "layout")); timeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(timeZoneId)); } @@ -1773,6 +1814,8 @@ private void doTestBucketedTableEvolutionWithDifferentReadCount(HiveStorageForma HiveTableLayoutHandle modifiedReadBucketCountLayoutHandle = new HiveTableLayoutHandle( layoutHandle.getSchemaTableName(), layoutHandle.getPartitionColumns(), + layoutHandle.getDataColumns(), + layoutHandle.getTableParameters(), layoutHandle.getPartitions().get(), layoutHandle.getDomainPredicate(), layoutHandle.getRemainingPredicate(), diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java index d248642b5f8b2..561a3d8743344 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java @@ -13,6 +13,9 @@ */ package com.facebook.presto.hive; +import com.facebook.presto.hive.metastore.Column; +import com.facebook.presto.hive.metastore.Storage; +import com.facebook.presto.hive.metastore.StorageFormat; import com.facebook.presto.hive.orc.DwrfBatchPageSourceFactory; import com.facebook.presto.hive.orc.OrcBatchPageSourceFactory; import com.facebook.presto.hive.parquet.ParquetPageSourceFactory; @@ -23,11 +26,11 @@ import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.RecordCursor; import com.facebook.presto.spi.RecordPageSource; +import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.predicate.TupleDomain; import com.facebook.presto.spi.type.ArrayType; import com.facebook.presto.spi.type.RowType; import com.facebook.presto.testing.TestingConnectorSession; -import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -55,7 +58,6 @@ import java.util.List; import java.util.Optional; import java.util.OptionalInt; -import java.util.Properties; import java.util.TimeZone; import java.util.stream.Collectors; @@ -83,15 +85,11 @@ import static com.facebook.presto.tests.StructuralTestUtil.arrayBlockOf; import static com.facebook.presto.tests.StructuralTestUtil.mapBlockOf; import static com.facebook.presto.tests.StructuralTestUtil.rowBlockOf; -import static com.google.common.base.Predicates.not; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.filter; -import static com.google.common.collect.Iterables.transform; import static io.airlift.slice.Slices.utf8Slice; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; -import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT; -import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB; import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardListObjectInspector; import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardMapObjectInspector; import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardStructObjectInspector; @@ -703,17 +701,17 @@ private void testCursorProvider(HiveRecordCursorProvider cursorProvider, ConnectorSession session, int rowCount) { - Properties splitProperties = new Properties(); - splitProperties.setProperty(FILE_INPUT_FORMAT, storageFormat.getInputFormat()); - splitProperties.setProperty(SERIALIZATION_LIB, storageFormat.getSerDe()); - splitProperties.setProperty("columns", Joiner.on(',').join(transform(filter(testColumns, not(TestColumn::isPartitionKey)), TestColumn::getName))); - splitProperties.setProperty("columns.types", Joiner.on(',').join(transform(filter(testColumns, not(TestColumn::isPartitionKey)), TestColumn::getType))); - List partitionKeys = testColumns.stream() .filter(TestColumn::isPartitionKey) .map(input -> new HivePartitionKey(input.getName(), (String) input.getWriteValue())) .collect(toList()); + List partitionKeyColumnHandles = getColumnHandles(testColumns.stream().filter(TestColumn::isPartitionKey).collect(toImmutableList())); + List tableDataColumns = testColumns.stream() + .filter(column -> !column.isPartitionKey()) + .map(column -> new Column(column.getName(), HiveType.valueOf(column.getType()), Optional.empty())) + .collect(toImmutableList()); + Configuration configuration = new Configuration(); configuration.set("io.compression.codecs", LzoCodec.class.getName() + "," + LzopCodec.class.getName()); Optional pageSource = HivePageSourceProvider.createHivePageSource( @@ -726,12 +724,22 @@ private void testCursorProvider(HiveRecordCursorProvider cursorProvider, split.getStart(), split.getLength(), split.getLength(), - splitProperties, + new Storage( + StorageFormat.create(storageFormat.getSerDe(), storageFormat.getInputFormat(), storageFormat.getOutputFormat()), + "location", + Optional.empty(), + false, + ImmutableMap.of()), TupleDomain.all(), getColumnHandles(testColumns), partitionKeys, DateTimeZone.getDefault(), TYPE_MANAGER, + new SchemaTableName("schema", "table"), + partitionKeyColumnHandles, + tableDataColumns, + ImmutableMap.of(), + tableDataColumns.size(), ImmutableMap.of(), Optional.empty(), false); @@ -749,17 +757,17 @@ private void testPageSourceFactory(HiveBatchPageSourceFactory sourceFactory, int rowCount) throws IOException { - Properties splitProperties = new Properties(); - splitProperties.setProperty(FILE_INPUT_FORMAT, storageFormat.getInputFormat()); - splitProperties.setProperty(SERIALIZATION_LIB, storageFormat.getSerDe()); - splitProperties.setProperty("columns", Joiner.on(',').join(transform(filter(testColumns, not(TestColumn::isPartitionKey)), TestColumn::getName))); - splitProperties.setProperty("columns.types", Joiner.on(',').join(transform(filter(testColumns, not(TestColumn::isPartitionKey)), TestColumn::getType))); - List partitionKeys = testColumns.stream() .filter(TestColumn::isPartitionKey) .map(input -> new HivePartitionKey(input.getName(), (String) input.getWriteValue())) .collect(toList()); + List partitionKeyColumnHandles = getColumnHandles(testColumns.stream().filter(TestColumn::isPartitionKey).collect(toImmutableList())); + List tableDataColumns = testColumns.stream() + .filter(column -> !column.isPartitionKey()) + .map(column -> new Column(column.getName(), HiveType.valueOf(column.getType()), Optional.empty())) + .collect(toImmutableList()); + List columnHandles = getColumnHandles(testColumns); Optional pageSource = HivePageSourceProvider.createHivePageSource( @@ -772,12 +780,22 @@ private void testPageSourceFactory(HiveBatchPageSourceFactory sourceFactory, split.getStart(), split.getLength(), split.getLength(), - splitProperties, + new Storage( + StorageFormat.create(storageFormat.getSerDe(), storageFormat.getInputFormat(), storageFormat.getOutputFormat()), + "location", + Optional.empty(), + false, + ImmutableMap.of()), TupleDomain.all(), columnHandles, partitionKeys, DateTimeZone.getDefault(), TYPE_MANAGER, + new SchemaTableName("schema", "table"), + partitionKeyColumnHandles, + tableDataColumns, + ImmutableMap.of(), + tableDataColumns.size(), ImmutableMap.of(), Optional.empty(), false); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java index 706ffbd6b7ea7..328e26cda1efe 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java @@ -14,8 +14,11 @@ package com.facebook.presto.hive; import com.facebook.presto.GroupByHashPageIndexerFactory; +import com.facebook.presto.hive.metastore.Column; import com.facebook.presto.hive.metastore.ExtendedHiveMetastore; import com.facebook.presto.hive.metastore.HivePageSinkMetadata; +import com.facebook.presto.hive.metastore.Storage; +import com.facebook.presto.hive.metastore.StorageFormat; import com.facebook.presto.metadata.MetadataManager; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorPageSink; @@ -34,7 +37,6 @@ import com.facebook.presto.testing.MaterializedResult; import com.facebook.presto.testing.TestingConnectorSession; import com.facebook.presto.testing.TestingNodeManager; -import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -54,7 +56,6 @@ import java.util.List; import java.util.Optional; import java.util.OptionalInt; -import java.util.Properties; import java.util.stream.Stream; import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR; @@ -83,6 +84,7 @@ import static com.facebook.presto.spi.type.IntegerType.INTEGER; import static com.facebook.presto.spi.type.VarcharType.createUnboundedVarcharType; import static com.facebook.presto.testing.assertions.Assert.assertEquals; +import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; @@ -90,8 +92,6 @@ import static io.airlift.testing.Assertions.assertGreaterThan; import static java.lang.String.format; import static java.util.stream.Collectors.toList; -import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT; -import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB; import static org.testng.Assert.assertTrue; public class TestHivePageSink @@ -216,11 +216,6 @@ public static MaterializedResult toMaterializedResult(ConnectorSession session, private static ConnectorPageSource createPageSource(HiveTransactionHandle transaction, HiveClientConfig config, File outputFile) { - Properties splitProperties = new Properties(); - splitProperties.setProperty(FILE_INPUT_FORMAT, config.getHiveStorageFormat().getInputFormat()); - splitProperties.setProperty(SERIALIZATION_LIB, config.getHiveStorageFormat().getSerDe()); - splitProperties.setProperty("columns", Joiner.on(',').join(getColumnHandles().stream().map(HiveColumnHandle::getName).collect(toList()))); - splitProperties.setProperty("columns.types", Joiner.on(',').join(getColumnHandles().stream().map(HiveColumnHandle::getHiveType).map(hiveType -> hiveType.getHiveTypeName().toString()).collect(toList()))); HiveSplit split = new HiveSplit( SCHEMA_NAME, TABLE_NAME, @@ -229,12 +224,18 @@ private static ConnectorPageSource createPageSource(HiveTransactionHandle transa 0, outputFile.length(), outputFile.length(), - splitProperties, + new Storage( + StorageFormat.create(config.getHiveStorageFormat().getSerDe(), config.getHiveStorageFormat().getInputFormat(), config.getHiveStorageFormat().getOutputFormat()), + "location", + Optional.empty(), + false, + ImmutableMap.of()), ImmutableList.of(), ImmutableList.of(), OptionalInt.empty(), OptionalInt.empty(), false, + getColumnHandles().size(), ImmutableMap.of(), Optional.empty(), false); @@ -245,6 +246,10 @@ private static ConnectorPageSource createPageSource(HiveTransactionHandle transa Optional.of(new HiveTableLayoutHandle( new SchemaTableName(SCHEMA_NAME, TABLE_NAME), ImmutableList.of(), + getColumnHandles().stream() + .map(column -> new Column(column.getName(), column.getHiveType(), Optional.empty())) + .collect(toImmutableList()), + ImmutableMap.of(), TupleDomain.all(), TRUE_CONSTANT, ImmutableMap.of(), diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplit.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplit.java index 9956f28246150..8f540344fc5c8 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplit.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplit.java @@ -16,6 +16,9 @@ import com.facebook.presto.block.BlockEncodingManager; import com.facebook.presto.block.BlockJsonSerde; import com.facebook.presto.hive.HiveColumnHandle.ColumnType; +import com.facebook.presto.hive.metastore.Column; +import com.facebook.presto.hive.metastore.Storage; +import com.facebook.presto.hive.metastore.StorageFormat; import com.facebook.presto.metadata.HandleJsonModule; import com.facebook.presto.metadata.HandleResolver; import com.facebook.presto.spi.HostAddress; @@ -40,7 +43,6 @@ import java.util.Optional; import java.util.OptionalInt; -import java.util.Properties; import static com.facebook.presto.hive.HiveType.HIVE_LONG; import static com.facebook.presto.hive.HiveType.HIVE_STRING; @@ -57,10 +59,6 @@ public class TestHiveSplit public void testJsonRoundTrip() throws Exception { - Properties schema = new Properties(); - schema.setProperty("foo", "bar"); - schema.setProperty("bar", "baz"); - ImmutableList partitionKeys = ImmutableList.of(new HivePartitionKey("a", "apple"), new HivePartitionKey("b", "42")); ImmutableList addresses = ImmutableList.of(HostAddress.fromParts("127.0.0.1", 44), HostAddress.fromParts("127.0.0.1", 45)); HiveSplit expected = new HiveSplit( @@ -71,13 +69,19 @@ public void testJsonRoundTrip() 42, 87, 88, - schema, + new Storage( + StorageFormat.create("serde", "input", "output"), + "location", + Optional.empty(), + false, + ImmutableMap.of()), partitionKeys, addresses, OptionalInt.empty(), OptionalInt.empty(), true, - ImmutableMap.of(1, HIVE_STRING), + 10, + ImmutableMap.of(1, new Column("name", HIVE_STRING, Optional.empty())), Optional.of(new HiveSplit.BucketConversion( 32, 16, @@ -95,10 +99,11 @@ public void testJsonRoundTrip() assertEquals(actual.getStart(), expected.getStart()); assertEquals(actual.getLength(), expected.getLength()); assertEquals(actual.getFileSize(), expected.getFileSize()); - assertEquals(actual.getSchema(), expected.getSchema()); + assertEquals(actual.getStorage(), expected.getStorage()); assertEquals(actual.getPartitionKeys(), expected.getPartitionKeys()); assertEquals(actual.getAddresses(), expected.getAddresses()); - assertEquals(actual.getColumnCoercions(), expected.getColumnCoercions()); + assertEquals(actual.getPartitionDataColumnCount(), expected.getPartitionDataColumnCount()); + assertEquals(actual.getPartitionSchemaDifference(), expected.getPartitionSchemaDifference()); assertEquals(actual.getBucketConversion(), expected.getBucketConversion()); assertEquals(actual.isForceLocalScheduling(), expected.isForceLocalScheduling()); assertEquals(actual.isS3SelectPushdownEnabled(), expected.isS3SelectPushdownEnabled()); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitSource.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitSource.java index 20f8927669e48..0f829f3e828b6 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitSource.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitSource.java @@ -13,6 +13,8 @@ */ package com.facebook.presto.hive; +import com.facebook.presto.hive.metastore.Storage; +import com.facebook.presto.hive.metastore.StorageFormat; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.PrestoException; @@ -27,7 +29,6 @@ import java.util.List; import java.util.Optional; import java.util.OptionalInt; -import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -183,7 +184,7 @@ public void testReaderWaitsForSplits() // wait for thread to get the split ConnectorSplit split = splits.get(10, SECONDS); - assertEquals(((HiveSplit) split).getSchema().getProperty("id"), "33"); + assertEquals(((HiveSplit) split).getPartitionDataColumnCount(), 33); } finally { // make sure the thread exits @@ -301,7 +302,7 @@ public void testPreloadSplitsForRewindableSplitSource() connectorSplits = getSplits(hiveSplitSource, OptionalInt.of(0), 10); for (int i = 0; i < 10; i++) { - assertEquals(((HiveSplit) connectorSplits.get(i)).getSchema().getProperty("id"), Integer.toString(i)); + assertEquals(((HiveSplit) connectorSplits.get(i)).getPartitionDataColumnCount(), i); } assertTrue(hiveSplitSource.isFinished()); } @@ -427,14 +428,19 @@ private TestSplit(int id, OptionalInt bucketNumber) true, false, false, - new HiveSplitPartitionInfo(properties("id", String.valueOf(id)), new Path("path").toUri(), ImmutableList.of(), "partition-name", ImmutableMap.of(), Optional.empty())); - } - - private static Properties properties(String key, String value) - { - Properties properties = new Properties(); - properties.put(key, value); - return properties; + new HiveSplitPartitionInfo( + new Storage( + StorageFormat.create("serde", "input", "output"), + "location", + Optional.empty(), + false, + ImmutableMap.of()), + new Path("path").toUri(), + ImmutableList.of(), + "partition-name", + id, + ImmutableMap.of(), + Optional.empty())); } } } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestOrcBatchPageSourceMemoryTracking.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestOrcBatchPageSourceMemoryTracking.java index 4feb75808ccde..43c57fe8c4fbc 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestOrcBatchPageSourceMemoryTracking.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestOrcBatchPageSourceMemoryTracking.java @@ -13,6 +13,8 @@ */ package com.facebook.presto.hive; +import com.facebook.presto.hive.metastore.Storage; +import com.facebook.presto.hive.metastore.StorageFormat; import com.facebook.presto.hive.orc.OrcBatchPageSourceFactory; import com.facebook.presto.metadata.MetadataManager; import com.facebook.presto.metadata.Split; @@ -29,6 +31,7 @@ import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorTableHandle; import com.facebook.presto.spi.Page; +import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.TableHandle; import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.classloader.ThreadContextClassLoader; @@ -91,7 +94,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Supplier; -import java.util.stream.Collectors; import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.PARTITION_KEY; import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR; @@ -114,9 +116,7 @@ import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newScheduledThreadPool; import static java.util.stream.Collectors.toList; -import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT; import static org.apache.hadoop.hive.ql.io.orc.CompressionKind.ZLIB; -import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB; import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardStructObjectInspector; import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaStringObjectInspector; import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.COMPRESS_CODEC; @@ -403,7 +403,7 @@ public void testScanFilterAndProjectOperator() private class TestPreparer { private final FileSplit fileSplit; - private final Properties schema; + private final Storage storage; private final TableHandle table; private final List columns; private final List types; @@ -421,17 +421,12 @@ public TestPreparer(String tempFilePath, List testColumns, int numRo throws Exception { OrcSerde serde = new OrcSerde(); - schema = new Properties(); - schema.setProperty("columns", - testColumns.stream() - .map(TestColumn::getName) - .collect(Collectors.joining(","))); - schema.setProperty("columns.types", - testColumns.stream() - .map(TestColumn::getType) - .collect(Collectors.joining(","))); - schema.setProperty(FILE_INPUT_FORMAT, OrcInputFormat.class.getName()); - schema.setProperty(SERIALIZATION_LIB, serde.getClass().getName()); + storage = new Storage( + StorageFormat.create(serde.getClass().getName(), OrcInputFormat.class.getName(), OrcOutputFormat.class.getName()), + "location", + Optional.empty(), + false, + ImmutableMap.of()); partitionKeys = testColumns.stream() .filter(TestColumn::isPartitionKey) @@ -487,12 +482,17 @@ public ConnectorPageSource newPageSource(FileFormatDataSourceStats stats, Connec fileSplit.getStart(), fileSplit.getLength(), fileSplit.getLength(), - schema, + storage, TupleDomain.all(), columns, partitionKeys, DateTimeZone.UTC, TYPE_MANAGER, + new SchemaTableName("schema", "table"), + ImmutableList.of(), + ImmutableList.of(), + ImmutableMap.of(), + 0, ImmutableMap.of(), Optional.empty(), false) diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/benchmark/FileFormat.java b/presto-hive/src/test/java/com/facebook/presto/hive/benchmark/FileFormat.java index 6091a4187f78e..728fb89269bda 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/benchmark/FileFormat.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/benchmark/FileFormat.java @@ -27,6 +27,8 @@ import com.facebook.presto.hive.RecordFileWriter; import com.facebook.presto.hive.TypeTranslator; import com.facebook.presto.hive.benchmark.HiveFileFormatBenchmark.TestData; +import com.facebook.presto.hive.metastore.Storage; +import com.facebook.presto.hive.metastore.StorageFormat; import com.facebook.presto.hive.orc.DwrfBatchPageSourceFactory; import com.facebook.presto.hive.orc.OrcBatchPageSourceFactory; import com.facebook.presto.hive.parquet.ParquetPageSourceFactory; @@ -400,7 +402,13 @@ private static ConnectorPageSource createPageSource( 0, targetFile.length(), targetFile.length(), - createSchema(format, columnNames, columnTypes), + new Storage( + StorageFormat.create(format.getSerDe(), format.getInputFormat(), format.getOutputFormat()), + "location", + Optional.empty(), + false, + ImmutableMap.of()), + ImmutableMap.of(), columnHandles, TupleDomain.all(), DateTimeZone.forID(session.getTimeZoneKey().getId())) diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/metastore/TestMetastoreUtil.java b/presto-hive/src/test/java/com/facebook/presto/hive/metastore/TestMetastoreUtil.java index 981f6ebc11707..81f4163c65490 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/metastore/TestMetastoreUtil.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/metastore/TestMetastoreUtil.java @@ -27,8 +27,15 @@ import org.testng.annotations.Test; import java.util.List; +import java.util.Optional; import java.util.Properties; +import static com.facebook.presto.hive.HiveType.HIVE_DATE; +import static com.facebook.presto.hive.HiveType.HIVE_DOUBLE; +import static com.facebook.presto.hive.HiveType.HIVE_INT; +import static com.facebook.presto.hive.HiveType.HIVE_STRING; +import static com.facebook.presto.hive.metastore.MetastoreUtil.reconstructPartitionSchema; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; public class TestMetastoreUtil @@ -164,4 +171,27 @@ public void testPartitionRoundTripUnsupported() Partition partition = ThriftMetastoreUtil.fromMetastoreApiPartition(TEST_PARTITION_WITH_UNSUPPORTED_FIELDS); ThriftMetastoreUtil.toMetastoreApiPartition(partition); } + + @Test + public void testReconstructPartitionSchema() + { + Column c1 = new Column("_c1", HIVE_STRING, Optional.empty()); + Column c2 = new Column("_c2", HIVE_INT, Optional.empty()); + Column c3 = new Column("_c3", HIVE_DOUBLE, Optional.empty()); + Column c4 = new Column("_c4", HIVE_DATE, Optional.empty()); + + assertEquals(reconstructPartitionSchema(ImmutableList.of(), 0, ImmutableMap.of()), ImmutableList.of()); + assertEquals(reconstructPartitionSchema(ImmutableList.of(c1), 0, ImmutableMap.of()), ImmutableList.of()); + assertEquals(reconstructPartitionSchema(ImmutableList.of(c1), 1, ImmutableMap.of()), ImmutableList.of(c1)); + assertEquals(reconstructPartitionSchema(ImmutableList.of(c1, c2), 1, ImmutableMap.of()), ImmutableList.of(c1)); + assertEquals(reconstructPartitionSchema(ImmutableList.of(c1, c2), 3, ImmutableMap.of(2, c3)), ImmutableList.of(c1, c2, c3)); + assertEquals(reconstructPartitionSchema(ImmutableList.of(c1, c2, c3), 3, ImmutableMap.of(1, c4)), ImmutableList.of(c1, c4, c3)); + + assertThatThrownBy(() -> reconstructPartitionSchema(ImmutableList.of(), 1, ImmutableMap.of())) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> reconstructPartitionSchema(ImmutableList.of(c1), 2, ImmutableMap.of())) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> reconstructPartitionSchema(ImmutableList.of(c1), 2, ImmutableMap.of(0, c2))) + .isInstanceOf(IllegalArgumentException.class); + } }