diff --git a/presto-hive-hadoop2/src/test/java/io/prestosql/plugin/hive/TestHiveAlluxioMetastore.java b/presto-hive-hadoop2/src/test/java/io/prestosql/plugin/hive/TestHiveAlluxioMetastore.java
index ee7441d95d59..c179fde223d3 100644
--- a/presto-hive-hadoop2/src/test/java/io/prestosql/plugin/hive/TestHiveAlluxioMetastore.java
+++ b/presto-hive-hadoop2/src/test/java/io/prestosql/plugin/hive/TestHiveAlluxioMetastore.java
@@ -210,6 +210,12 @@ public void testPartitionStatisticsSampling()
// Alluxio metastore does not support create operations
}
+ @Override
+ public void testApplyProjection()
+ {
+ // Alluxio metastore does not support create/delete operations
+ }
+
@Override
public void testPreferredInsertLayout()
{
diff --git a/presto-hive/pom.xml b/presto-hive/pom.xml
index 0bb54a2e850d..3d47b575d16b 100644
--- a/presto-hive/pom.xml
+++ b/presto-hive/pom.xml
@@ -245,6 +245,13 @@
test
+
+ io.prestosql
+ presto-main
+ test-jar
+ test
+
+
io.prestosql
presto-main
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/GenericHiveRecordCursorProvider.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/GenericHiveRecordCursorProvider.java
index 52baea41cfb7..87da5d6b34b5 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/GenericHiveRecordCursorProvider.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/GenericHiveRecordCursorProvider.java
@@ -36,6 +36,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
+import static io.prestosql.plugin.hive.ReaderProjections.projectBaseColumns;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;
@@ -59,7 +60,7 @@ public GenericHiveRecordCursorProvider(HdfsEnvironment hdfsEnvironment, DataSize
}
@Override
- public Optional createRecordCursor(
+ public Optional createRecordCursor(
Configuration configuration,
ConnectorSession session,
Path path,
@@ -83,18 +84,32 @@ public Optional createRecordCursor(
throw new PrestoException(HIVE_FILESYSTEM_ERROR, "Failed getting FileSystem: " + path, e);
}
- return hdfsEnvironment.doAs(session.getUser(), () -> {
- RecordReader, ?> recordReader = HiveUtil.createRecordReader(configuration, path, start, length, schema, columns);
+ Optional projectedReaderColumns = projectBaseColumns(columns);
- return Optional.of(new GenericHiveRecordCursor<>(
+ RecordCursor cursor = hdfsEnvironment.doAs(session.getUser(), () -> {
+ RecordReader, ?> recordReader = HiveUtil.createRecordReader(
+ configuration,
+ path,
+ start,
+ length,
+ schema,
+ projectedReaderColumns
+ .map(ReaderProjections::getReaderColumns)
+ .orElse(columns));
+
+ return new GenericHiveRecordCursor<>(
configuration,
path,
genericRecordReader(recordReader),
length,
schema,
- columns,
- hiveStorageTimeZone));
+ projectedReaderColumns
+ .map(ReaderProjections::getReaderColumns)
+ .orElse(columns),
+ hiveStorageTimeZone);
});
+
+ return Optional.of(new ReaderRecordCursorWithProjections(cursor, projectedReaderColumns));
}
@SuppressWarnings("unchecked")
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveApplyProjectionUtil.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveApplyProjectionUtil.java
new file mode 100644
index 000000000000..13c7413eada9
--- /dev/null
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveApplyProjectionUtil.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.hive;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import io.prestosql.spi.connector.ColumnHandle;
+import io.prestosql.spi.expression.ConnectorExpression;
+import io.prestosql.spi.expression.FieldDereference;
+import io.prestosql.spi.expression.Variable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+final class HiveApplyProjectionUtil
+{
+ private HiveApplyProjectionUtil() {}
+
+ public static List extractSupportedProjectedColumns(ConnectorExpression expression)
+ {
+ requireNonNull(expression, "expression is null");
+ ImmutableList.Builder supportedSubExpressions = ImmutableList.builder();
+ fillSupportedProjectedColumns(expression, supportedSubExpressions);
+ return supportedSubExpressions.build();
+ }
+
+ private static void fillSupportedProjectedColumns(ConnectorExpression expression, ImmutableList.Builder supportedSubExpressions)
+ {
+ if (isPushDownSupported(expression)) {
+ supportedSubExpressions.add(expression);
+ return;
+ }
+
+ // If the whole expression is not supported, look for a partially supported projection
+ if (expression instanceof FieldDereference) {
+ fillSupportedProjectedColumns(((FieldDereference) expression).getTarget(), supportedSubExpressions);
+ }
+ }
+
+ @VisibleForTesting
+ static boolean isPushDownSupported(ConnectorExpression expression)
+ {
+ return expression instanceof Variable ||
+ (expression instanceof FieldDereference && isPushDownSupported(((FieldDereference) expression).getTarget()));
+ }
+
+ public static ProjectedColumnRepresentation createProjectedColumnRepresentation(ConnectorExpression expression)
+ {
+ ImmutableList.Builder ordinals = ImmutableList.builder();
+
+ Variable target;
+ while (true) {
+ if (expression instanceof Variable) {
+ target = (Variable) expression;
+ break;
+ }
+ else if (expression instanceof FieldDereference) {
+ FieldDereference dereference = (FieldDereference) expression;
+ ordinals.add(dereference.getField());
+ expression = dereference.getTarget();
+ }
+ else {
+ throw new IllegalArgumentException("expression is not a valid dereference chain");
+ }
+ }
+
+ return new ProjectedColumnRepresentation(target, ordinals.build().reverse());
+ }
+
+ /**
+ * Replace all connector expressions with variables as given by {@param expressionToVariableMappings} in a top down manner.
+ * i.e. if the replacement occurs for the parent, the children will not be visited.
+ */
+ public static ConnectorExpression replaceWithNewVariables(ConnectorExpression expression, Map expressionToVariableMappings)
+ {
+ if (expressionToVariableMappings.containsKey(expression)) {
+ return expressionToVariableMappings.get(expression);
+ }
+
+ if (expression instanceof FieldDereference) {
+ ConnectorExpression newTarget = replaceWithNewVariables(((FieldDereference) expression).getTarget(), expressionToVariableMappings);
+ return new FieldDereference(expression.getType(), newTarget, ((FieldDereference) expression).getField());
+ }
+
+ return expression;
+ }
+
+ /**
+ * Returns the assignment key corresponding to the column represented by {@param projectedColumn} in the {@param assignments}, if one exists.
+ * The variable in the {@param projectedColumn} can itself be a representation of another projected column. For example,
+ * say a projected column representation has variable "x" and a dereferenceIndices=[0]. "x" can in-turn map to a projected
+ * column handle with base="a" and [1, 2] as dereference indices. Then the method searches for a column handle in
+ * {@param assignments} with base="a" and dereferenceIndices=[1, 2, 0].
+ */
+ public static Optional find(Map assignments, ProjectedColumnRepresentation projectedColumn)
+ {
+ HiveColumnHandle variableColumn = (HiveColumnHandle) assignments.get(projectedColumn.getVariable().getName());
+
+ if (variableColumn == null) {
+ return Optional.empty();
+ }
+
+ String baseColumnName = variableColumn.getBaseColumnName();
+
+ List variableColumnIndices = variableColumn.getHiveColumnProjectionInfo()
+ .map(HiveColumnProjectionInfo::getDereferenceIndices)
+ .orElse(ImmutableList.of());
+
+ List projectionIndices = ImmutableList.builder()
+ .addAll(variableColumnIndices)
+ .addAll(projectedColumn.getDereferenceIndices())
+ .build();
+
+ for (Map.Entry entry : assignments.entrySet()) {
+ HiveColumnHandle column = (HiveColumnHandle) entry.getValue();
+ if (column.getBaseColumnName().equals(baseColumnName) &&
+ column.getHiveColumnProjectionInfo()
+ .map(HiveColumnProjectionInfo::getDereferenceIndices)
+ .orElse(ImmutableList.of())
+ .equals(projectionIndices)) {
+ return Optional.of(entry.getKey());
+ }
+ }
+
+ return Optional.empty();
+ }
+
+ public static class ProjectedColumnRepresentation
+ {
+ private final Variable variable;
+ private final List dereferenceIndices;
+
+ public ProjectedColumnRepresentation(Variable variable, List dereferenceIndices)
+ {
+ this.variable = requireNonNull(variable, "variable is null");
+ this.dereferenceIndices = ImmutableList.copyOf(requireNonNull(dereferenceIndices, "dereferenceIndices is null"));
+ }
+
+ public Variable getVariable()
+ {
+ return variable;
+ }
+
+ public List getDereferenceIndices()
+ {
+ return dereferenceIndices;
+ }
+
+ public boolean isVariable()
+ {
+ return dereferenceIndices.isEmpty();
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+ ProjectedColumnRepresentation that = (ProjectedColumnRepresentation) obj;
+ return Objects.equals(variable, that.variable) &&
+ Objects.equals(dereferenceIndices, that.dereferenceIndices);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(variable, dereferenceIndices);
+ }
+ }
+}
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveBucketHandle.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveBucketHandle.java
index c8cded6199b7..17a53dc36fd9 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveBucketHandle.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveBucketHandle.java
@@ -20,6 +20,8 @@
import java.util.List;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
@@ -41,6 +43,7 @@ public HiveBucketHandle(
@JsonProperty("readBucketCount") int readBucketCount)
{
this.columns = requireNonNull(columns, "columns is null");
+ columns.forEach(column -> checkArgument(column.isBaseColumn(), format("projected column %s is not allowed for bucketing", column)));
this.bucketingVersion = requireNonNull(bucketingVersion, "bucketingVersion is null");
this.tableBucketCount = tableBucketCount;
this.readBucketCount = readBucketCount;
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveCoercionRecordCursor.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveCoercionRecordCursor.java
index d071745d4b2d..37d94e48263a 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveCoercionRecordCursor.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveCoercionRecordCursor.java
@@ -74,8 +74,8 @@ public HiveCoercionRecordCursor(
for (int columnIndex = 0; columnIndex < size; columnIndex++) {
ColumnMapping columnMapping = columnMappings.get(columnIndex);
- if (columnMapping.getCoercionFrom().isPresent()) {
- coercers[columnIndex] = createCoercer(typeManager, columnMapping.getCoercionFrom().get(), columnMapping.getHiveColumnHandle().getHiveType(), bridgingRecordCursor);
+ if (columnMapping.getBaseTypeCoercionFrom().isPresent()) {
+ coercers[columnIndex] = createCoercer(typeManager, columnMapping.getBaseTypeCoercionFrom().get(), columnMapping.getHiveColumnHandle().getHiveType(), bridgingRecordCursor);
}
}
}
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveColumnHandle.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveColumnHandle.java
index 2a70868c58a2..69e8345e6de4 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveColumnHandle.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveColumnHandle.java
@@ -33,6 +33,11 @@
import static io.prestosql.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;
+/**
+ * ColumnHandle for Hive Connector representing a full top level column or a projected column. Currently projected columns
+ * that represent a simple chain of dereferences are supported. e.g. for a column "A" with type struct(B struct(C bigint, ...), ....)
+ * there can be a projected column representing expression "A.B.C".
+ */
public class HiveColumnHandle
implements ColumnHandle
{
@@ -65,47 +70,102 @@ public enum ColumnType
SYNTHESIZED,
}
+ // Information about top level hive column
+ private final String baseColumnName;
+ private final int baseHiveColumnIndex;
+ private final HiveType baseHiveType;
+ private final Type baseType;
+ private final Optional comment;
+
+ // Information about parts of the base column to be referenced by this column handle.
+ private final Optional hiveColumnProjectionInfo;
+
private final String name;
- private final HiveType hiveType;
- private final Type type;
- private final int hiveColumnIndex;
private final ColumnType columnType;
- private final Optional comment;
@JsonCreator
public HiveColumnHandle(
- @JsonProperty("name") String name,
- @JsonProperty("hiveType") HiveType hiveType,
- @JsonProperty("type") Type type,
- @JsonProperty("hiveColumnIndex") int hiveColumnIndex,
+ @JsonProperty("baseColumnName") String baseColumnName,
+ @JsonProperty("baseHiveColumnIndex") int baseHiveColumnIndex,
+ @JsonProperty("baseHiveType") HiveType baseHiveType,
+ @JsonProperty("baseType") Type baseType,
+ @JsonProperty("hiveColumnProjectionInfo") Optional hiveColumnProjectionInfo,
@JsonProperty("columnType") ColumnType columnType,
@JsonProperty("comment") Optional comment)
{
- this.name = requireNonNull(name, "name is null");
- checkArgument(hiveColumnIndex >= 0 || columnType == PARTITION_KEY || columnType == SYNTHESIZED, "hiveColumnIndex is negative");
- this.hiveColumnIndex = hiveColumnIndex;
- this.hiveType = requireNonNull(hiveType, "hiveType is null");
- this.type = requireNonNull(type, "type is null");
+ this.baseColumnName = requireNonNull(baseColumnName, "baseColumnName is null");
+ checkArgument(baseHiveColumnIndex >= 0 || columnType == PARTITION_KEY || columnType == SYNTHESIZED, "baseHiveColumnIndex is negative");
+ this.baseHiveColumnIndex = baseHiveColumnIndex;
+ this.baseHiveType = requireNonNull(baseHiveType, "baseHiveType is null");
+ this.baseType = requireNonNull(baseType, "baseType is null");
+
+ this.hiveColumnProjectionInfo = requireNonNull(hiveColumnProjectionInfo, "hiveColumnProjectionInfo is null");
+
+ this.name = this.baseColumnName + hiveColumnProjectionInfo.map(HiveColumnProjectionInfo::getPartialName).orElse("");
+
this.columnType = requireNonNull(columnType, "columnType is null");
this.comment = requireNonNull(comment, "comment is null");
}
- @JsonProperty
+ public static HiveColumnHandle createBaseColumn(
+ String topLevelColumnName,
+ int topLevelColumnIndex,
+ HiveType hiveType,
+ Type type,
+ ColumnType columnType,
+ Optional comment)
+ {
+ return new HiveColumnHandle(topLevelColumnName, topLevelColumnIndex, hiveType, type, Optional.empty(), columnType, comment);
+ }
+
+ public HiveColumnHandle getBaseColumn()
+ {
+ return isBaseColumn() ? this : createBaseColumn(baseColumnName, baseHiveColumnIndex, baseHiveType, baseType, columnType, comment);
+ }
+
public String getName()
{
return name;
}
@JsonProperty
- public HiveType getHiveType()
+ public String getBaseColumnName()
+ {
+ return baseColumnName;
+ }
+
+ @JsonProperty
+ public HiveType getBaseHiveType()
+ {
+ return baseHiveType;
+ }
+
+ @JsonProperty
+ public Type getBaseType()
{
- return hiveType;
+ return baseType;
}
@JsonProperty
- public int getHiveColumnIndex()
+ public int getBaseHiveColumnIndex()
+ {
+ return baseHiveColumnIndex;
+ }
+
+ @JsonProperty
+ public Optional getHiveColumnProjectionInfo()
+ {
+ return hiveColumnProjectionInfo;
+ }
+
+ public HiveType getHiveType()
{
- return hiveColumnIndex;
+ return hiveColumnProjectionInfo.map(HiveColumnProjectionInfo::getHiveType).orElse(baseHiveType);
+ }
+
+ public Type getType()
+ {
+ return hiveColumnProjectionInfo.map(HiveColumnProjectionInfo::getType).orElse(baseType);
}
public boolean isPartitionKey()
@@ -122,7 +182,7 @@ public ColumnMetadata getColumnMetadata()
{
return ColumnMetadata.builder()
.setName(name)
- .setType(type)
+ .setType(getType())
.setHidden(isHidden())
.build();
}
@@ -134,21 +194,20 @@ public Optional getComment()
}
@JsonProperty
- public Type getType()
+ public ColumnType getColumnType()
{
- return type;
+ return columnType;
}
- @JsonProperty
- public ColumnType getColumnType()
+ public boolean isBaseColumn()
{
- return columnType;
+ return !hiveColumnProjectionInfo.isPresent();
}
@Override
public int hashCode()
{
- return Objects.hash(name, hiveColumnIndex, hiveType, columnType, comment);
+ return Objects.hash(baseColumnName, baseHiveColumnIndex, baseHiveType, baseType, hiveColumnProjectionInfo, columnType, comment);
}
@Override
@@ -161,9 +220,12 @@ public boolean equals(Object obj)
return false;
}
HiveColumnHandle other = (HiveColumnHandle) obj;
- return Objects.equals(this.name, other.name) &&
- Objects.equals(this.hiveColumnIndex, other.hiveColumnIndex) &&
- Objects.equals(this.hiveType, other.hiveType) &&
+ return Objects.equals(this.baseColumnName, other.baseColumnName) &&
+ Objects.equals(this.baseHiveColumnIndex, other.baseHiveColumnIndex) &&
+ Objects.equals(this.baseHiveType, other.baseHiveType) &&
+ Objects.equals(this.baseType, other.baseType) &&
+ Objects.equals(this.hiveColumnProjectionInfo, other.hiveColumnProjectionInfo) &&
+ Objects.equals(this.name, other.name) &&
this.columnType == other.columnType &&
Objects.equals(this.comment, other.comment);
}
@@ -171,7 +233,7 @@ public boolean equals(Object obj)
@Override
public String toString()
{
- return name + ":" + hiveType + ":" + hiveColumnIndex + ":" + columnType;
+ return name + ":" + getHiveType() + ":" + columnType;
}
public static HiveColumnHandle updateRowIdHandle()
@@ -182,12 +244,12 @@ public static HiveColumnHandle updateRowIdHandle()
// plan-time support for row-by-row delete so that planning doesn't fail. This is why we need
// rowid handle. Note that in Hive connector, rowid handle is not implemented beyond plan-time.
- return new HiveColumnHandle(UPDATE_ROW_ID_COLUMN_NAME, HIVE_LONG, BIGINT, -1, SYNTHESIZED, Optional.empty());
+ return createBaseColumn(UPDATE_ROW_ID_COLUMN_NAME, -1, HIVE_LONG, BIGINT, SYNTHESIZED, Optional.empty());
}
public static HiveColumnHandle pathColumnHandle()
{
- return new HiveColumnHandle(PATH_COLUMN_NAME, PATH_HIVE_TYPE, PATH_TYPE, PATH_COLUMN_INDEX, SYNTHESIZED, Optional.empty());
+ return createBaseColumn(PATH_COLUMN_NAME, PATH_COLUMN_INDEX, PATH_HIVE_TYPE, PATH_TYPE, SYNTHESIZED, Optional.empty());
}
/**
@@ -197,36 +259,36 @@ public static HiveColumnHandle pathColumnHandle()
*/
public static HiveColumnHandle bucketColumnHandle()
{
- return new HiveColumnHandle(BUCKET_COLUMN_NAME, BUCKET_HIVE_TYPE, BUCKET_TYPE_SIGNATURE, BUCKET_COLUMN_INDEX, SYNTHESIZED, Optional.empty());
+ return createBaseColumn(BUCKET_COLUMN_NAME, BUCKET_COLUMN_INDEX, BUCKET_HIVE_TYPE, BUCKET_TYPE_SIGNATURE, SYNTHESIZED, Optional.empty());
}
public static HiveColumnHandle fileSizeColumnHandle()
{
- return new HiveColumnHandle(FILE_SIZE_COLUMN_NAME, FILE_SIZE_TYPE, FILE_SIZE_TYPE_SIGNATURE, FILE_SIZE_COLUMN_INDEX, SYNTHESIZED, Optional.empty());
+ return createBaseColumn(FILE_SIZE_COLUMN_NAME, FILE_SIZE_COLUMN_INDEX, FILE_SIZE_TYPE, FILE_SIZE_TYPE_SIGNATURE, SYNTHESIZED, Optional.empty());
}
public static HiveColumnHandle fileModifiedTimeColumnHandle()
{
- return new HiveColumnHandle(FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_TYPE, FILE_MODIFIED_TIME_TYPE_SIGNATURE, FILE_MODIFIED_TIME_COLUMN_INDEX, SYNTHESIZED, Optional.empty());
+ return createBaseColumn(FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_COLUMN_INDEX, FILE_MODIFIED_TIME_TYPE, FILE_MODIFIED_TIME_TYPE_SIGNATURE, SYNTHESIZED, Optional.empty());
}
public static boolean isPathColumnHandle(HiveColumnHandle column)
{
- return column.getHiveColumnIndex() == PATH_COLUMN_INDEX;
+ return column.getBaseHiveColumnIndex() == PATH_COLUMN_INDEX;
}
public static boolean isBucketColumnHandle(HiveColumnHandle column)
{
- return column.getHiveColumnIndex() == BUCKET_COLUMN_INDEX;
+ return column.getBaseHiveColumnIndex() == BUCKET_COLUMN_INDEX;
}
public static boolean isFileSizeColumnHandle(HiveColumnHandle column)
{
- return column.getHiveColumnIndex() == FILE_SIZE_COLUMN_INDEX;
+ return column.getBaseHiveColumnIndex() == FILE_SIZE_COLUMN_INDEX;
}
public static boolean isFileModifiedTimeColumnHandle(HiveColumnHandle column)
{
- return column.getHiveColumnIndex() == FILE_MODIFIED_TIME_COLUMN_INDEX;
+ return column.getBaseHiveColumnIndex() == FILE_MODIFIED_TIME_COLUMN_INDEX;
}
}
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveColumnProjectionInfo.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveColumnProjectionInfo.java
new file mode 100644
index 000000000000..6cac57ddf80d
--- /dev/null
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveColumnProjectionInfo.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.hive;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.prestosql.spi.type.Type;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class HiveColumnProjectionInfo
+{
+ private final List dereferenceIndices;
+ private final List dereferenceNames;
+ private final HiveType hiveType;
+ private final Type type;
+ private final String partialName;
+
+ @JsonCreator
+ public HiveColumnProjectionInfo(
+ @JsonProperty("dereferenceIndices") List dereferenceIndices,
+ @JsonProperty("dereferenceNames") List dereferenceNames,
+ @JsonProperty("hiveType") HiveType hiveType,
+ @JsonProperty("type") Type type)
+ {
+ this.dereferenceIndices = requireNonNull(dereferenceIndices, "dereferenceIndices is null");
+ this.dereferenceNames = requireNonNull(dereferenceNames, "dereferenceNames is null");
+ checkArgument(dereferenceIndices.size() > 0, "dereferenceIndices should not be empty");
+ checkArgument(dereferenceIndices.size() == dereferenceNames.size(), "dereferenceIndices and dereferenceNames should have the same sizes");
+
+ this.hiveType = requireNonNull(hiveType, "hiveType is null");
+ this.type = requireNonNull(type, "type is null");
+
+ this.partialName = generatePartialName(dereferenceNames);
+ }
+
+ public String getPartialName()
+ {
+ return partialName;
+ }
+
+ @JsonProperty
+ public List getDereferenceIndices()
+ {
+ return dereferenceIndices;
+ }
+
+ @JsonProperty
+ public List getDereferenceNames()
+ {
+ return dereferenceNames;
+ }
+
+ @JsonProperty
+ public HiveType getHiveType()
+ {
+ return hiveType;
+ }
+
+ @JsonProperty
+ public Type getType()
+ {
+ return type;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(dereferenceIndices, dereferenceNames, hiveType, type);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ HiveColumnProjectionInfo other = (HiveColumnProjectionInfo) obj;
+ return Objects.equals(this.dereferenceIndices, other.dereferenceIndices) &&
+ Objects.equals(this.dereferenceNames, other.dereferenceNames) &&
+ Objects.equals(this.hiveType, other.hiveType) &&
+ Objects.equals(this.type, other.type);
+ }
+
+ public static String generatePartialName(List dereferenceNames)
+ {
+ return dereferenceNames.stream()
+ .map(name -> "#" + name)
+ .collect(Collectors.joining());
+ }
+}
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java
index b7b8cd072205..74317423f46b 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java
@@ -29,6 +29,7 @@
import io.airlift.slice.Slice;
import io.prestosql.plugin.base.CatalogName;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
+import io.prestosql.plugin.hive.HiveApplyProjectionUtil.ProjectedColumnRepresentation;
import io.prestosql.plugin.hive.LocationService.WriteInfo;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.metastore.Column;
@@ -65,11 +66,15 @@
import io.prestosql.spi.connector.ConstraintApplicationResult;
import io.prestosql.spi.connector.DiscretePredicates;
import io.prestosql.spi.connector.InMemoryRecordSet;
+import io.prestosql.spi.connector.ProjectionApplicationResult;
+import io.prestosql.spi.connector.ProjectionApplicationResult.Assignment;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.connector.SchemaTablePrefix;
import io.prestosql.spi.connector.SystemTable;
import io.prestosql.spi.connector.TableNotFoundException;
import io.prestosql.spi.connector.ViewNotFoundException;
+import io.prestosql.spi.expression.ConnectorExpression;
+import io.prestosql.spi.expression.Variable;
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.NullableValue;
import io.prestosql.spi.predicate.TupleDomain;
@@ -101,6 +106,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -126,6 +132,9 @@
import static com.google.common.collect.Streams.stream;
import static io.prestosql.plugin.hive.HiveAnalyzeProperties.getColumnNames;
import static io.prestosql.plugin.hive.HiveAnalyzeProperties.getPartitionList;
+import static io.prestosql.plugin.hive.HiveApplyProjectionUtil.extractSupportedProjectedColumns;
+import static io.prestosql.plugin.hive.HiveApplyProjectionUtil.find;
+import static io.prestosql.plugin.hive.HiveApplyProjectionUtil.replaceWithNewVariables;
import static io.prestosql.plugin.hive.HiveBasicStatistics.createEmptyStatistics;
import static io.prestosql.plugin.hive.HiveBasicStatistics.createZeroStatistics;
import static io.prestosql.plugin.hive.HiveColumnHandle.BUCKET_COLUMN_NAME;
@@ -135,6 +144,7 @@
import static io.prestosql.plugin.hive.HiveColumnHandle.FILE_MODIFIED_TIME_COLUMN_NAME;
import static io.prestosql.plugin.hive.HiveColumnHandle.FILE_SIZE_COLUMN_NAME;
import static io.prestosql.plugin.hive.HiveColumnHandle.PATH_COLUMN_NAME;
+import static io.prestosql.plugin.hive.HiveColumnHandle.createBaseColumn;
import static io.prestosql.plugin.hive.HiveColumnHandle.updateRowIdHandle;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_COLUMN_ORDER_MISMATCH;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_CONCURRENT_MODIFICATION_DETECTED;
@@ -1916,6 +1926,100 @@ public void validateScan(ConnectorSession session, ConnectorTableHandle tableHan
}
}
+ @Override
+ public Optional> applyProjection(
+ ConnectorSession session,
+ ConnectorTableHandle handle,
+ List projections,
+ Map assignments)
+ {
+ // Create projected column representations for supported sub expressions. Simple column references and chain of
+ // dereferences on a variable are supported right now.
+ Set projectedExpressions = projections.stream()
+ .flatMap(expression -> extractSupportedProjectedColumns(expression).stream())
+ .collect(toImmutableSet());
+
+ Map columnProjections = projectedExpressions.stream()
+ .collect(toImmutableMap(Function.identity(), HiveApplyProjectionUtil::createProjectedColumnRepresentation));
+
+ // No pushdown required if all references are simple variables
+ if (columnProjections.values().stream().allMatch(ProjectedColumnRepresentation::isVariable)) {
+ return Optional.empty();
+ }
+
+ Map newAssignments = new HashMap<>();
+ ImmutableMap.Builder expressionToVariableMappings = ImmutableMap.builder();
+
+ for (Map.Entry entry : columnProjections.entrySet()) {
+ ConnectorExpression expression = entry.getKey();
+ ProjectedColumnRepresentation projectedColumn = entry.getValue();
+
+ ColumnHandle projectedColumnHandle;
+ String projectedColumnName;
+
+ // See if input already contains a columnhandle for this projected column, avoid creating duplicates.
+ Optional existingColumn = find(assignments, projectedColumn);
+
+ if (existingColumn.isPresent()) {
+ projectedColumnName = existingColumn.get();
+ projectedColumnHandle = assignments.get(projectedColumnName);
+ }
+ else {
+ // Create a new column handle
+ HiveColumnHandle oldColumnHandle = (HiveColumnHandle) assignments.get(projectedColumn.getVariable().getName());
+ projectedColumnHandle = createProjectedColumnHandle(oldColumnHandle, projectedColumn.getDereferenceIndices());
+ projectedColumnName = ((HiveColumnHandle) projectedColumnHandle).getName();
+ }
+
+ Variable projectedColumnVariable = new Variable(projectedColumnName, expression.getType());
+ Assignment newAssignment = new Assignment(projectedColumnName, projectedColumnHandle, expression.getType());
+ newAssignments.put(projectedColumnName, newAssignment);
+
+ expressionToVariableMappings.put(expression, projectedColumnVariable);
+ }
+
+ // Modify projections to refer to new variables
+ List newProjections = projections.stream()
+ .map(expression -> replaceWithNewVariables(expression, expressionToVariableMappings.build()))
+ .collect(toImmutableList());
+
+ List outputAssignments = newAssignments.values().stream().collect(toImmutableList());
+ return Optional.of(new ProjectionApplicationResult<>(handle, newProjections, outputAssignments));
+ }
+
+ private HiveColumnHandle createProjectedColumnHandle(HiveColumnHandle column, List indices)
+ {
+ HiveType oldHiveType = column.getHiveType();
+ HiveType newHiveType = oldHiveType.getHiveTypeForDereferences(indices).get();
+
+ HiveColumnProjectionInfo columnProjectionInfo = new HiveColumnProjectionInfo(
+ // Merge indices
+ ImmutableList.builder()
+ .addAll(column.getHiveColumnProjectionInfo()
+ .map(HiveColumnProjectionInfo::getDereferenceIndices)
+ .orElse(ImmutableList.of()))
+ .addAll(indices)
+ .build(),
+ // Merge names
+ ImmutableList.builder()
+ .addAll(column.getHiveColumnProjectionInfo()
+ .map(HiveColumnProjectionInfo::getDereferenceNames)
+ .orElse(ImmutableList.of()))
+ .addAll(oldHiveType.getHiveDereferenceNames(indices))
+ .build(),
+ newHiveType,
+ newHiveType.getType(typeManager));
+
+ return new HiveColumnHandle(
+ column.getBaseColumnName(),
+ column.getBaseHiveColumnIndex(),
+ column.getBaseHiveType(),
+ column.getBaseType(),
+ Optional.of(columnProjectionInfo),
+ column.getColumnType(),
+ column.getComment());
+ }
+
@Override
public Optional getCommonPartitioningHandle(ConnectorSession session, ConnectorPartitioningHandle left, ConnectorPartitioningHandle right)
{
@@ -2336,11 +2440,11 @@ else if (column.isHidden()) {
else {
columnType = REGULAR;
}
- columnHandles.add(new HiveColumnHandle(
+ columnHandles.add(createBaseColumn(
column.getName(),
+ ordinal,
toHiveType(typeTranslator, column.getType()),
column.getType(),
- ordinal,
columnType,
Optional.ofNullable(column.getComment())));
ordinal++;
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSource.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSource.java
index b7aa82f3f46d..b4d8785cbb22 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSource.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSource.java
@@ -59,6 +59,7 @@
import static io.airlift.slice.Slices.utf8Slice;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_CURSOR_ERROR;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES;
+import static io.prestosql.plugin.hive.HivePageSourceProvider.ColumnMappingKind.EMPTY;
import static io.prestosql.plugin.hive.HivePageSourceProvider.ColumnMappingKind.PREFILLED;
import static io.prestosql.plugin.hive.HiveType.HIVE_BYTE;
import static io.prestosql.plugin.hive.HiveType.HIVE_DOUBLE;
@@ -120,12 +121,14 @@ public class HivePageSource
private final Object[] prefilledValues;
private final Type[] types;
private final List>> coercers;
+ private final Optional projectionsAdapter;
private final ConnectorPageSource delegate;
public HivePageSource(
List columnMappings,
Optional bucketAdaptation,
+ Optional projectionsAdapter,
DateTimeZone hiveStorageTimeZone,
TypeManager typeManager,
ConnectorPageSource delegate)
@@ -138,6 +141,8 @@ public HivePageSource(
this.columnMappings = columnMappings;
this.bucketAdapter = bucketAdaptation.map(BucketAdapter::new);
+ this.projectionsAdapter = requireNonNull(projectionsAdapter, "projectionsAdapter is null");
+
int size = columnMappings.size();
prefilledValues = new Object[size];
@@ -152,14 +157,22 @@ public HivePageSource(
Type type = column.getType();
types[columnIndex] = type;
- if (columnMapping.getCoercionFrom().isPresent()) {
- coercers.add(Optional.of(createCoercer(typeManager, columnMapping.getCoercionFrom().get(), columnMapping.getHiveColumnHandle().getHiveType())));
+ if (columnMapping.getKind() != EMPTY && columnMapping.getBaseTypeCoercionFrom().isPresent()) {
+ List dereferenceIndices = column.getHiveColumnProjectionInfo()
+ .map(HiveColumnProjectionInfo::getDereferenceIndices)
+ .orElse(ImmutableList.of());
+ HiveType fromType = columnMapping.getBaseTypeCoercionFrom().get().getHiveTypeForDereferences(dereferenceIndices).get();
+ HiveType toType = columnMapping.getHiveColumnHandle().getHiveType();
+ coercers.add(Optional.of(createCoercer(typeManager, fromType, toType)));
}
else {
coercers.add(Optional.empty());
}
- if (columnMapping.getKind() == PREFILLED) {
+ if (columnMapping.getKind() == EMPTY) {
+ prefilledValues[columnIndex] = null;
+ }
+ else if (columnMapping.getKind() == PREFILLED) {
String columnValue = columnMapping.getPrefilledValue();
byte[] bytes = columnValue.getBytes(UTF_8);
@@ -246,6 +259,10 @@ public Page getNextPage()
return null;
}
+ if (projectionsAdapter.isPresent()) {
+ dataPage = projectionsAdapter.get().adaptPage(dataPage);
+ }
+
if (bucketAdapter.isPresent()) {
IntArrayList rowsToKeep = bucketAdapter.get().computeEligibleRowIds(dataPage);
dataPage = dataPage.getPositions(rowsToKeep.elements(), 0, rowsToKeep.size());
@@ -257,6 +274,7 @@ public Page getNextPage()
ColumnMapping columnMapping = columnMappings.get(fieldId);
switch (columnMapping.getKind()) {
case PREFILLED:
+ case EMPTY:
blocks.add(RunLengthEncodedBlock.create(types[fieldId], prefilledValues[fieldId], batchSize));
break;
case REGULAR:
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSourceFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSourceFactory.java
index c18b0a2689c2..aceb46c7f202 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSourceFactory.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSourceFactory.java
@@ -24,9 +24,11 @@
import java.util.Optional;
import java.util.Properties;
+import static java.util.Objects.requireNonNull;
+
public interface HivePageSourceFactory
{
- Optional extends ConnectorPageSource> createPageSource(
+ Optional createPageSource(
Configuration configuration,
ConnectorSession session,
Path path,
@@ -38,4 +40,39 @@ Optional extends ConnectorPageSource> createPageSource(
TupleDomain effectivePredicate,
DateTimeZone hiveStorageTimeZone,
Optional deleteDeltaLocations);
+
+ /**
+ * A wrapper class for
+ * - delegate reader page source and
+ * - projection information for columns to be returned by the delegate
+ *
+ * Empty {@param projectedReaderColumns} indicates that the delegate page source reads the exact same columns provided to
+ * it in {@link HivePageSourceFactory#createPageSource}
+ */
+ class ReaderPageSourceWithProjections
+ {
+ private final ConnectorPageSource connectorPageSource;
+ private final Optional projectedReaderColumns;
+
+ public ReaderPageSourceWithProjections(ConnectorPageSource connectorPageSource, Optional projectedReaderColumns)
+ {
+ this.connectorPageSource = requireNonNull(connectorPageSource, "connectorPageSource is null");
+ this.projectedReaderColumns = requireNonNull(projectedReaderColumns, "projectedReaderColumns is null");
+ }
+
+ public ConnectorPageSource getConnectorPageSource()
+ {
+ return connectorPageSource;
+ }
+
+ public Optional getProjectedReaderColumns()
+ {
+ return projectedReaderColumns;
+ }
+
+ public static ReaderPageSourceWithProjections noProjectionAdaptation(ConnectorPageSource connectorPageSource)
+ {
+ return new ReaderPageSourceWithProjections(connectorPageSource, Optional.empty());
+ }
+ }
}
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSourceProvider.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSourceProvider.java
index 57f741e5c3d6..8f4f930d3c3f 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSourceProvider.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSourceProvider.java
@@ -16,6 +16,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
+import io.prestosql.plugin.hive.HivePageSourceFactory.ReaderPageSourceWithProjections;
+import io.prestosql.plugin.hive.HiveRecordCursorProvider.ReaderRecordCursorWithProjections;
import io.prestosql.plugin.hive.HiveSplit.BucketConversion;
import io.prestosql.plugin.hive.util.HiveBucketing.BucketingVersion;
import io.prestosql.spi.connector.ColumnHandle;
@@ -37,6 +39,7 @@
import javax.inject.Inject;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -139,7 +142,7 @@ public static Optional createHivePageSource(
long fileModifiedTime,
Properties schema,
TupleDomain effectivePredicate,
- List hiveColumns,
+ List columns,
List partitionKeys,
DateTimeZone hiveStorageTimeZone,
TypeManager typeManager,
@@ -154,7 +157,7 @@ public static Optional createHivePageSource(
List columnMappings = ColumnMapping.buildColumnMappings(
partitionKeys,
- hiveColumns,
+ columns,
bucketConversion.map(BucketConversion::getBucketColumnHandles).orElse(ImmutableList.of()),
tableToPartitionMapping,
path,
@@ -163,25 +166,12 @@ public static Optional createHivePageSource(
fileModifiedTime);
List regularAndInterimColumnMappings = ColumnMapping.extractRegularAndInterimColumnMappings(columnMappings);
- Optional bucketAdaptation = bucketConversion.map(conversion -> {
- Map hiveIndexToBlockIndex = uniqueIndex(regularAndInterimColumnMappings, columnMapping -> columnMapping.getHiveColumnHandle().getHiveColumnIndex());
- int[] bucketColumnIndices = conversion.getBucketColumnHandles().stream()
- .mapToInt(columnHandle -> hiveIndexToBlockIndex.get(columnHandle.getHiveColumnIndex()).getIndex())
- .toArray();
- List bucketColumnHiveTypes = conversion.getBucketColumnHandles().stream()
- .map(columnHandle -> hiveIndexToBlockIndex.get(columnHandle.getHiveColumnIndex()).getHiveColumnHandle().getHiveType())
- .collect(toImmutableList());
- return new BucketAdaptation(
- bucketColumnIndices,
- bucketColumnHiveTypes,
- conversion.getBucketingVersion(),
- conversion.getTableBucketCount(),
- conversion.getPartitionBucketCount(),
- bucketNumber.getAsInt());
- });
+ Optional bucketAdaptation = createBucketAdaptation(bucketConversion, bucketNumber, regularAndInterimColumnMappings);
for (HivePageSourceFactory pageSourceFactory : pageSourceFactories) {
- Optional extends ConnectorPageSource> pageSource = pageSourceFactory.createPageSource(
+ List desiredColumns = toColumnHandles(regularAndInterimColumnMappings, true, typeManager);
+
+ Optional readerWithProjections = pageSourceFactory.createPageSource(
configuration,
session,
path,
@@ -189,18 +179,27 @@ public static Optional createHivePageSource(
length,
fileSize,
schema,
- toColumnHandles(regularAndInterimColumnMappings, true, typeManager),
+ desiredColumns,
effectivePredicate,
hiveStorageTimeZone,
deleteDeltaLocations);
- if (pageSource.isPresent()) {
- return Optional.of(
- new HivePageSource(
- columnMappings,
- bucketAdaptation,
- hiveStorageTimeZone,
- typeManager,
- pageSource.get()));
+
+ if (readerWithProjections.isPresent()) {
+ ConnectorPageSource pageSource = readerWithProjections.get().getConnectorPageSource();
+
+ Optional readerProjections = readerWithProjections.get().getProjectedReaderColumns();
+ Optional adapter = Optional.empty();
+ if (readerProjections.isPresent()) {
+ adapter = Optional.of(new ReaderProjectionsAdapter(desiredColumns, readerProjections.get()));
+ }
+
+ return Optional.of(new HivePageSource(
+ columnMappings,
+ bucketAdaptation,
+ adapter,
+ hiveStorageTimeZone,
+ typeManager,
+ pageSource));
}
}
@@ -208,7 +207,8 @@ public static Optional createHivePageSource(
// GenericHiveRecordCursor will automatically do the coercion without HiveCoercionRecordCursor
boolean doCoercion = !(provider instanceof GenericHiveRecordCursorProvider);
- Optional cursor = provider.createRecordCursor(
+ List desiredColumns = toColumnHandles(regularAndInterimColumnMappings, doCoercion, typeManager);
+ Optional readerWithProjections = provider.createRecordCursor(
configuration,
session,
path,
@@ -216,14 +216,20 @@ public static Optional createHivePageSource(
length,
fileSize,
schema,
- toColumnHandles(regularAndInterimColumnMappings, doCoercion, typeManager),
+ desiredColumns,
effectivePredicate,
hiveStorageTimeZone,
typeManager,
s3SelectPushdownEnabled);
- if (cursor.isPresent()) {
- RecordCursor delegate = cursor.get();
+ if (readerWithProjections.isPresent()) {
+ RecordCursor delegate = readerWithProjections.get().getRecordCursor();
+ Optional projections = readerWithProjections.get().getProjectedReaderColumns();
+
+ if (projections.isPresent()) {
+ ReaderProjectionsAdapter projectionsAdapter = new ReaderProjectionsAdapter(desiredColumns, projections.get());
+ delegate = new HiveReaderProjectionsAdaptingRecordCursor(delegate, projectionsAdapter);
+ }
checkArgument(!deleteDeltaLocations.isPresent(), "Delete delta is not supported");
@@ -248,7 +254,7 @@ public static Optional createHivePageSource(
columnMappings,
hiveStorageTimeZone,
delegate);
- List columnTypes = hiveColumns.stream()
+ List columnTypes = columns.stream()
.map(HiveColumnHandle::getType)
.collect(toList());
@@ -268,33 +274,45 @@ public static class ColumnMapping
* ordinal of this column in the underlying page source or record cursor
*/
private final OptionalInt index;
- private final Optional coercionFrom;
+ private final Optional baseTypeCoercionFrom;
- public static ColumnMapping regular(HiveColumnHandle hiveColumnHandle, int index, Optional coerceFrom)
+ public static ColumnMapping regular(HiveColumnHandle hiveColumnHandle, int index, Optional baseTypeCoercionFrom)
{
checkArgument(hiveColumnHandle.getColumnType() == REGULAR);
- return new ColumnMapping(ColumnMappingKind.REGULAR, hiveColumnHandle, Optional.empty(), OptionalInt.of(index), coerceFrom);
+ return new ColumnMapping(ColumnMappingKind.REGULAR, hiveColumnHandle, Optional.empty(), OptionalInt.of(index), baseTypeCoercionFrom);
}
- public static ColumnMapping prefilled(HiveColumnHandle hiveColumnHandle, String prefilledValue, Optional coerceFrom)
+ public static ColumnMapping prefilled(HiveColumnHandle hiveColumnHandle, String prefilledValue, Optional baseTypeCoercionFrom)
{
checkArgument(hiveColumnHandle.getColumnType() == PARTITION_KEY || hiveColumnHandle.getColumnType() == SYNTHESIZED);
- return new ColumnMapping(ColumnMappingKind.PREFILLED, hiveColumnHandle, Optional.of(prefilledValue), OptionalInt.empty(), coerceFrom);
+ checkArgument(hiveColumnHandle.isBaseColumn(), "prefilled values not supported for projected columns");
+ return new ColumnMapping(ColumnMappingKind.PREFILLED, hiveColumnHandle, Optional.of(prefilledValue), OptionalInt.empty(), baseTypeCoercionFrom);
}
- public static ColumnMapping interim(HiveColumnHandle hiveColumnHandle, int index)
+ public static ColumnMapping interim(HiveColumnHandle hiveColumnHandle, int index, Optional baseTypeCoercionFrom)
{
checkArgument(hiveColumnHandle.getColumnType() == REGULAR);
- return new ColumnMapping(ColumnMappingKind.INTERIM, hiveColumnHandle, Optional.empty(), OptionalInt.of(index), Optional.empty());
+ return new ColumnMapping(ColumnMappingKind.INTERIM, hiveColumnHandle, Optional.empty(), OptionalInt.of(index), baseTypeCoercionFrom);
}
- private ColumnMapping(ColumnMappingKind kind, HiveColumnHandle hiveColumnHandle, Optional prefilledValue, OptionalInt index, Optional coerceFrom)
+ public static ColumnMapping empty(HiveColumnHandle hiveColumnHandle)
+ {
+ checkArgument(hiveColumnHandle.getColumnType() == REGULAR);
+ return new ColumnMapping(ColumnMappingKind.EMPTY, hiveColumnHandle, Optional.empty(), OptionalInt.empty(), Optional.empty());
+ }
+
+ private ColumnMapping(
+ ColumnMappingKind kind,
+ HiveColumnHandle hiveColumnHandle,
+ Optional prefilledValue,
+ OptionalInt index,
+ Optional baseTypeCoercionFrom)
{
this.kind = requireNonNull(kind, "kind is null");
this.hiveColumnHandle = requireNonNull(hiveColumnHandle, "hiveColumnHandle is null");
this.prefilledValue = requireNonNull(prefilledValue, "prefilledValue is null");
this.index = requireNonNull(index, "index is null");
- this.coercionFrom = requireNonNull(coerceFrom, "coerceFrom is null");
+ this.baseTypeCoercionFrom = requireNonNull(baseTypeCoercionFrom, "baseTypeCoercionFrom is null");
}
public ColumnMappingKind getKind()
@@ -319,9 +337,9 @@ public int getIndex()
return index.getAsInt();
}
- public Optional getCoercionFrom()
+ public Optional getBaseTypeCoercionFrom()
{
- return coercionFrom;
+ return baseTypeCoercionFrom;
}
public static List buildColumnMappings(
@@ -335,37 +353,72 @@ public static List buildColumnMappings(
long fileModifiedTime)
{
Map partitionKeysByName = uniqueIndex(partitionKeys, HivePartitionKey::getName);
- int regularIndex = 0;
- Set regularColumnIndices = new HashSet<>();
+
+ // Maintain state about hive columns added to the mapping as we iterate (for validation)
+ Set baseColumnHiveIndices = new HashSet<>();
+ Map>> projectionsForColumn = new HashMap<>();
+
ImmutableList.Builder columnMappings = ImmutableList.builder();
+ int regularIndex = 0;
+
for (HiveColumnHandle column : columns) {
- Optional coercionFrom = tableToPartitionMapping.getCoercion(column.getHiveColumnIndex());
+ Optional baseTypeCoercionFrom = tableToPartitionMapping.getCoercion(column.getBaseHiveColumnIndex());
+
if (column.getColumnType() == REGULAR) {
- checkArgument(regularColumnIndices.add(column.getHiveColumnIndex()), "duplicate hiveColumnIndex in columns list");
- columnMappings.add(regular(column, regularIndex, coercionFrom));
- regularIndex++;
+ if (column.isBaseColumn()) {
+ baseColumnHiveIndices.add(column.getBaseHiveColumnIndex());
+ }
+
+ checkArgument(
+ projectionsForColumn.computeIfAbsent(column.getBaseHiveColumnIndex(), HashSet::new).add(column.getHiveColumnProjectionInfo()),
+ "duplicate column in columns list");
+
+ // Add regular mapping if projection is valid for partition schema, otherwise add an empty mapping
+ if (!baseTypeCoercionFrom.isPresent()
+ || projectionValidForType(baseTypeCoercionFrom.get(), column.getHiveColumnProjectionInfo())) {
+ columnMappings.add(regular(column, regularIndex, baseTypeCoercionFrom));
+ regularIndex++;
+ }
+ else {
+ columnMappings.add(empty(column));
+ }
}
else {
columnMappings.add(prefilled(
column,
getPrefilledColumnValue(column, partitionKeysByName.get(column.getName()), path, bucketNumber, fileSize, fileModifiedTime),
- coercionFrom));
+ baseTypeCoercionFrom));
}
}
+
for (HiveColumnHandle column : requiredInterimColumns) {
checkArgument(column.getColumnType() == REGULAR);
- if (regularColumnIndices.contains(column.getHiveColumnIndex())) {
+ checkArgument(column.isBaseColumn(), "bucketed columns should be base columns");
+ if (baseColumnHiveIndices.contains(column.getBaseHiveColumnIndex())) {
continue; // This column exists in columns. Do not add it again.
}
- // If coercion does not affect bucket number calculation, coercion doesn't need to be applied here.
- // Otherwise, read of this partition should not be allowed.
- // (Alternatively, the partition could be read as an unbucketed partition. This is not implemented.)
- columnMappings.add(interim(column, regularIndex));
+
+ if (projectionsForColumn.containsKey(column.getBaseHiveColumnIndex())) {
+ columnMappings.add(interim(column, regularIndex, tableToPartitionMapping.getCoercion(column.getBaseHiveColumnIndex())));
+ }
+ else {
+ // If coercion does not affect bucket number calculation, coercion doesn't need to be applied here.
+ // Otherwise, read of this partition should not be allowed.
+ // (Alternatively, the partition could be read as an unbucketed partition. This is not implemented.)
+ columnMappings.add(interim(column, regularIndex, Optional.empty()));
+ }
regularIndex++;
}
return columnMappings.build();
}
+ private static boolean projectionValidForType(HiveType baseType, Optional projection)
+ {
+ List dereferences = projection.map(HiveColumnProjectionInfo::getDereferenceIndices).orElse(ImmutableList.of());
+ Optional targetType = baseType.getHiveTypeForDereferences(dereferences);
+ return targetType.isPresent();
+ }
+
public static List extractRegularAndInterimColumnMappings(List columnMappings)
{
return columnMappings.stream()
@@ -378,16 +431,28 @@ public static List toColumnHandles(List regular
return regularColumnMappings.stream()
.map(columnMapping -> {
HiveColumnHandle columnHandle = columnMapping.getHiveColumnHandle();
- if (!doCoercion || !columnMapping.getCoercionFrom().isPresent()) {
+ if (!doCoercion || !columnMapping.getBaseTypeCoercionFrom().isPresent()) {
return columnHandle;
}
+ HiveType fromHiveTypeBase = columnMapping.getBaseTypeCoercionFrom().get();
+
+ Optional newColumnProjectionInfo = columnHandle.getHiveColumnProjectionInfo().map(projectedColumn -> {
+ HiveType fromHiveType = fromHiveTypeBase.getHiveTypeForDereferences(projectedColumn.getDereferenceIndices()).get();
+ return new HiveColumnProjectionInfo(
+ projectedColumn.getDereferenceIndices(),
+ projectedColumn.getDereferenceNames(),
+ fromHiveType,
+ fromHiveType.getType(typeManager));
+ });
+
return new HiveColumnHandle(
- columnHandle.getName(),
- columnMapping.getCoercionFrom().get(),
- columnMapping.getCoercionFrom().get().getType(typeManager),
- columnHandle.getHiveColumnIndex(),
+ columnHandle.getBaseColumnName(),
+ columnHandle.getBaseHiveColumnIndex(),
+ fromHiveTypeBase,
+ fromHiveTypeBase.getType(typeManager),
+ newColumnProjectionInfo,
columnHandle.getColumnType(),
- Optional.empty());
+ columnHandle.getComment());
})
.collect(toList());
}
@@ -398,6 +463,31 @@ public enum ColumnMappingKind
REGULAR,
PREFILLED,
INTERIM,
+ EMPTY
+ }
+
+ private static Optional createBucketAdaptation(Optional bucketConversion, OptionalInt bucketNumber, List columnMappings)
+ {
+ return bucketConversion.map(conversion -> {
+ List baseColumnMapping = columnMappings.stream()
+ .filter(mapping -> mapping.getHiveColumnHandle().isBaseColumn())
+ .collect(toList());
+ Map baseHiveColumnToBlockIndex = uniqueIndex(baseColumnMapping, mapping -> mapping.getHiveColumnHandle().getBaseHiveColumnIndex());
+
+ int[] bucketColumnIndices = conversion.getBucketColumnHandles().stream()
+ .mapToInt(columnHandle -> baseHiveColumnToBlockIndex.get(columnHandle.getBaseHiveColumnIndex()).getIndex())
+ .toArray();
+ List bucketColumnHiveTypes = conversion.getBucketColumnHandles().stream()
+ .map(columnHandle -> baseHiveColumnToBlockIndex.get(columnHandle.getBaseHiveColumnIndex()).getHiveColumnHandle().getHiveType())
+ .collect(toImmutableList());
+ return new BucketAdaptation(
+ bucketColumnIndices,
+ bucketColumnHiveTypes,
+ conversion.getBucketingVersion(),
+ conversion.getTableBucketCount(),
+ conversion.getPartitionBucketCount(),
+ bucketNumber.getAsInt());
+ });
}
public static class BucketAdaptation
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveReaderProjectionsAdaptingRecordCursor.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveReaderProjectionsAdaptingRecordCursor.java
new file mode 100644
index 000000000000..0d6a7ae9f480
--- /dev/null
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveReaderProjectionsAdaptingRecordCursor.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.hive;
+
+import com.google.common.collect.Iterables;
+import io.airlift.slice.Slice;
+import io.prestosql.plugin.hive.ReaderProjectionsAdapter.ChannelMapping;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.connector.RecordCursor;
+import io.prestosql.spi.type.Type;
+
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Applies projections on delegate fields provided by {@link ChannelMapping} to produce fields expected from this cursor.
+ */
+public class HiveReaderProjectionsAdaptingRecordCursor
+ implements RecordCursor
+{
+ private final RecordCursor delegate;
+ private final ChannelMapping[] channelMappings;
+ private final Type[] outputTypes;
+ private final Type[] inputTypes;
+
+ private final Type[] baseTypes;
+
+ public HiveReaderProjectionsAdaptingRecordCursor(RecordCursor delegate, ReaderProjectionsAdapter projectionsAdapter)
+ {
+ this.delegate = requireNonNull(delegate, "delegate is null");
+ requireNonNull(projectionsAdapter, "projectionsAdapter is null");
+
+ this.channelMappings = new ChannelMapping[projectionsAdapter.getOutputToInputMapping().size()];
+ projectionsAdapter.getOutputToInputMapping().toArray(channelMappings);
+
+ this.outputTypes = new Type[projectionsAdapter.getOutputTypes().size()];
+ projectionsAdapter.getOutputTypes().toArray(outputTypes);
+
+ this.inputTypes = new Type[projectionsAdapter.getInputTypes().size()];
+ projectionsAdapter.getInputTypes().toArray(inputTypes);
+
+ this.baseTypes = new Type[outputTypes.length];
+ for (int i = 0; i < baseTypes.length; i++) {
+ Type type = inputTypes[channelMappings[i].getInputChannelIndex()];
+ List dereferences = channelMappings[i].getDereferenceSequence();
+ for (int j = 0; j < dereferences.size(); j++) {
+ type = type.getTypeParameters().get(dereferences.get(j));
+ }
+ baseTypes[i] = type;
+ }
+ }
+
+ @Override
+ public long getCompletedBytes()
+ {
+ return delegate.getCompletedBytes();
+ }
+
+ @Override
+ public long getReadTimeNanos()
+ {
+ return delegate.getReadTimeNanos();
+ }
+
+ @Override
+ public Type getType(int field)
+ {
+ return outputTypes[field];
+ }
+
+ @Override
+ public boolean advanceNextPosition()
+ {
+ return delegate.advanceNextPosition();
+ }
+
+ private Block applyDereferences(Block baseObject, List dereferences, int length)
+ {
+ checkArgument(length <= dereferences.size());
+ Block current = baseObject;
+ for (int i = 0; i < length; i++) {
+ current = current.getObject(dereferences.get(i), Block.class);
+ }
+ return current;
+ }
+
+ @Override
+ public boolean getBoolean(int field)
+ {
+ int inputFieldIndex = channelMappings[field].getInputChannelIndex();
+ List dereferences = channelMappings[field].getDereferenceSequence();
+
+ if (dereferences.isEmpty()) {
+ return delegate.getBoolean(inputFieldIndex);
+ }
+
+ // Get SingleRowBlock corresponding to the element at current position
+ Block elementBlock = (Block) delegate.getObject(inputFieldIndex);
+
+ // Apply dereferences except for the last one, which is type dependent
+ Block baseObject = applyDereferences(elementBlock, dereferences, dereferences.size() - 1);
+
+ return baseTypes[field].getBoolean(baseObject, Iterables.getLast(dereferences));
+ }
+
+ @Override
+ public long getLong(int field)
+ {
+ int inputFieldIndex = channelMappings[field].getInputChannelIndex();
+ List dereferences = channelMappings[field].getDereferenceSequence();
+
+ if (dereferences.isEmpty()) {
+ return delegate.getLong(inputFieldIndex);
+ }
+
+ // Get SingleRowBlock corresponding to the element at current position
+ Block elementBlock = (Block) delegate.getObject(inputFieldIndex);
+
+ // Apply dereferences except for the last one, which is type dependent
+ Block baseObject = applyDereferences(elementBlock, dereferences, dereferences.size() - 1);
+
+ return baseTypes[field].getLong(baseObject, Iterables.getLast(dereferences));
+ }
+
+ @Override
+ public double getDouble(int field)
+ {
+ int inputFieldIndex = channelMappings[field].getInputChannelIndex();
+ List dereferences = channelMappings[field].getDereferenceSequence();
+
+ if (dereferences.isEmpty()) {
+ return delegate.getDouble(inputFieldIndex);
+ }
+
+ // Get SingleRowBlock corresponding to the element at current position
+ Block elementBlock = (Block) delegate.getObject(inputFieldIndex);
+
+ // Apply dereferences except for the last one, which is type dependent
+ Block baseObject = applyDereferences(elementBlock, dereferences, dereferences.size() - 1);
+
+ return baseTypes[field].getDouble(baseObject, Iterables.getLast(dereferences));
+ }
+
+ @Override
+ public Slice getSlice(int field)
+ {
+ int inputFieldIndex = channelMappings[field].getInputChannelIndex();
+ List dereferences = channelMappings[field].getDereferenceSequence();
+
+ if (dereferences.isEmpty()) {
+ return delegate.getSlice(inputFieldIndex);
+ }
+
+ // Get SingleRowBlock corresponding to the element at current position
+ Block elementBlock = (Block) delegate.getObject(inputFieldIndex);
+
+ // Apply dereferences except for the last one, which is type dependent
+ Block baseObject = applyDereferences(elementBlock, dereferences, dereferences.size() - 1);
+
+ return baseTypes[field].getSlice(baseObject, Iterables.getLast(dereferences));
+ }
+
+ @Override
+ public Object getObject(int field)
+ {
+ int inputFieldIndex = channelMappings[field].getInputChannelIndex();
+ List dereferences = channelMappings[field].getDereferenceSequence();
+
+ if (dereferences.isEmpty()) {
+ return delegate.getObject(inputFieldIndex);
+ }
+
+ // Get SingleRowBlock corresponding to the element at current position
+ Block elementBlock = (Block) delegate.getObject(inputFieldIndex);
+
+ // Apply dereferences except for the last one, which is type dependent
+ Block baseObject = applyDereferences(elementBlock, dereferences, dereferences.size() - 1);
+
+ return baseTypes[field].getObject(baseObject, Iterables.getLast(dereferences));
+ }
+
+ @Override
+ public boolean isNull(int field)
+ {
+ int inputFieldIndex = channelMappings[field].getInputChannelIndex();
+ List dereferences = channelMappings[field].getDereferenceSequence();
+
+ if (dereferences.isEmpty()) {
+ return delegate.isNull(inputFieldIndex);
+ }
+
+ if (delegate.isNull(inputFieldIndex)) {
+ return true;
+ }
+
+ // Get SingleRowBlock corresponding to the element at current position
+ Block baseObject = (Block) delegate.getObject(inputFieldIndex);
+
+ for (int j = 0; j < dereferences.size() - 1; j++) {
+ int dereferenceIndex = dereferences.get(j);
+ if (baseObject.isNull(dereferenceIndex)) {
+ return true;
+ }
+ baseObject = baseObject.getObject(dereferenceIndex, Block.class);
+ }
+
+ int finalDereference = Iterables.getLast(dereferences);
+ return baseObject.isNull(finalDereference);
+ }
+
+ @Override
+ public long getSystemMemoryUsage()
+ {
+ return delegate.getSystemMemoryUsage();
+ }
+
+ @Override
+ public void close()
+ {
+ delegate.close();
+ }
+}
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveRecordCursor.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveRecordCursor.java
index 58aa7bbd3b1a..fe5e9dfb7a9d 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveRecordCursor.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveRecordCursor.java
@@ -25,6 +25,7 @@
import java.util.List;
+import static io.prestosql.plugin.hive.HivePageSourceProvider.ColumnMappingKind.EMPTY;
import static io.prestosql.plugin.hive.HivePageSourceProvider.ColumnMappingKind.PREFILLED;
import static io.prestosql.plugin.hive.HivePageSourceProvider.ColumnMappingKind.REGULAR;
import static io.prestosql.plugin.hive.util.HiveUtil.bigintPartitionKey;
@@ -99,6 +100,9 @@ public HiveRecordCursor(
for (int columnIndex = 0; columnIndex < size; columnIndex++) {
ColumnMapping columnMapping = columnMappings.get(columnIndex);
+ if (columnMapping.getKind() == EMPTY) {
+ nulls[columnIndex] = true;
+ }
if (columnMapping.getKind() == PREFILLED) {
String columnValue = columnMapping.getPrefilledValue();
byte[] bytes = columnValue.getBytes(UTF_8);
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveRecordCursorProvider.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveRecordCursorProvider.java
index c84e036d63a0..f307921154e3 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveRecordCursorProvider.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveRecordCursorProvider.java
@@ -25,9 +25,11 @@
import java.util.Optional;
import java.util.Properties;
+import static java.util.Objects.requireNonNull;
+
public interface HiveRecordCursorProvider
{
- Optional createRecordCursor(
+ Optional createRecordCursor(
Configuration configuration,
ConnectorSession session,
Path path,
@@ -40,4 +42,34 @@ Optional createRecordCursor(
DateTimeZone hiveStorageTimeZone,
TypeManager typeManager,
boolean s3SelectPushdownEnabled);
+
+ /**
+ * A wrapper class for
+ * - delegate reader record cursor and
+ * - projection information for columns to be returned by the delegate
+ *
+ * Empty {@param projectedReaderColumns} indicates that the delegate cursor reads the exact same columns provided to
+ * it in {@link HiveRecordCursorProvider#createRecordCursor}
+ */
+ class ReaderRecordCursorWithProjections
+ {
+ private final RecordCursor recordCursor;
+ private final Optional projectedReaderColumns;
+
+ public ReaderRecordCursorWithProjections(RecordCursor recordCursor, Optional projectedReaderColumns)
+ {
+ this.recordCursor = requireNonNull(recordCursor, "recordCursor is null");
+ this.projectedReaderColumns = requireNonNull(projectedReaderColumns, "projectedReaderColumns is null");
+ }
+
+ public RecordCursor getRecordCursor()
+ {
+ return recordCursor;
+ }
+
+ public Optional getProjectedReaderColumns()
+ {
+ return projectedReaderColumns;
+ }
+ }
}
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveType.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveType.java
index 393d5ee3348f..a7d4bb4e87a1 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveType.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveType.java
@@ -37,6 +37,7 @@
import java.util.Locale;
import java.util.Optional;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.prestosql.spi.type.BigintType.BIGINT;
@@ -279,4 +280,39 @@ public static Type getPrimitiveType(PrimitiveTypeInfo typeInfo)
return null;
}
}
+
+ public Optional getHiveTypeForDereferences(List dereferences)
+ {
+ TypeInfo typeInfo = getTypeInfo();
+ for (int fieldIndex : dereferences) {
+ checkArgument(typeInfo instanceof StructTypeInfo, "typeInfo should be struct type", typeInfo);
+ StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+ try {
+ typeInfo = structTypeInfo.getAllStructFieldTypeInfos().get(fieldIndex);
+ }
+ catch (RuntimeException e) {
+ return Optional.empty();
+ }
+ }
+ return Optional.of(toHiveType(typeInfo));
+ }
+
+ public List getHiveDereferenceNames(List dereferences)
+ {
+ ImmutableList.Builder dereferenceNames = ImmutableList.builder();
+ TypeInfo typeInfo = getTypeInfo();
+ for (int fieldIndex : dereferences) {
+ checkArgument(typeInfo instanceof StructTypeInfo, "typeInfo should be struct type", typeInfo);
+ StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+
+ checkArgument(fieldIndex >= 0, "fieldIndex cannot be negative");
+ checkArgument(fieldIndex < structTypeInfo.getAllStructFieldNames().size(),
+ "fieldIndex should be less than the number of fields in the struct");
+ String fieldName = structTypeInfo.getAllStructFieldNames().get(fieldIndex);
+ dereferenceNames.add(fieldName);
+ typeInfo = structTypeInfo.getAllStructFieldTypeInfos().get(fieldIndex);
+ }
+
+ return dereferenceNames.build();
+ }
}
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/IonSqlQueryBuilder.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/IonSqlQueryBuilder.java
index 5c1f2c174090..681dc84c0aef 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/IonSqlQueryBuilder.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/IonSqlQueryBuilder.java
@@ -62,6 +62,11 @@ public IonSqlQueryBuilder(TypeManager typeManager)
public String buildSql(List columns, TupleDomain tupleDomain)
{
+ columns.forEach(column -> checkArgument(column.isBaseColumn(), "%s is not a base column", column));
+ tupleDomain.getDomains().ifPresent(domains -> {
+ domains.keySet().forEach(column -> checkArgument(column.isBaseColumn(), "%s is not a base column", column));
+ });
+
StringBuilder sql = new StringBuilder("SELECT ");
if (columns.isEmpty()) {
@@ -69,7 +74,7 @@ public String buildSql(List columns, TupleDomain format("s._%d", column.getHiveColumnIndex() + 1))
+ .map(column -> format("s._%d", column.getBaseHiveColumnIndex() + 1))
.collect(joining(", "));
sql.append(columnNames);
}
@@ -94,7 +99,7 @@ private List toConjuncts(List columns, TupleDomain readerColumns;
+ // indices for mapping expected hive column handles to the reader's column handles
+ private final List readerBlockIndices;
+
+ private ReaderProjections(List readerColumns, List readerBlockIndices)
+ {
+ this.readerColumns = ImmutableList.copyOf(requireNonNull(readerColumns, "readerColumns is null"));
+
+ readerBlockIndices.forEach(value -> checkArgument(value >= 0 && value < readerColumns.size(), "block index out of bounds"));
+ this.readerBlockIndices = ImmutableList.copyOf(requireNonNull(readerBlockIndices, "readerBlockIndices is null"));
+ }
+
+ /**
+ * For a column required by the {@link HivePageSource}, returns the column read by the delegate page source or record cursor.
+ */
+ public HiveColumnHandle readerColumnForHiveColumnAt(int index)
+ {
+ checkArgument(index >= 0 && index < readerBlockIndices.size(), "index is not valid");
+ int readerIndex = readerBlockIndices.get(index);
+ return readerColumns.get(readerIndex);
+ }
+
+ /**
+ * For a channel expected by {@link HivePageSource}, returns the channel index in the underlying page source or record cursor.
+ */
+ public int readerColumnPositionForHiveColumnAt(int index)
+ {
+ checkArgument(index >= 0 && index < readerBlockIndices.size(), "index is invalid");
+ return readerBlockIndices.get(index);
+ }
+
+ /**
+ * returns the actual list of columns being read by underlying page source or record cursor in order.
+ */
+ public List getReaderColumns()
+ {
+ return readerColumns;
+ }
+
+ /**
+ * Creates a mapping between the input {@param columns} and base columns if required.
+ */
+ public static Optional projectBaseColumns(List columns)
+ {
+ requireNonNull(columns, "columns is null");
+
+ // No projection is required if all columns are base columns
+ if (columns.stream().allMatch(HiveColumnHandle::isBaseColumn)) {
+ return Optional.empty();
+ }
+
+ ImmutableList.Builder projectedColumns = ImmutableList.builder();
+ ImmutableList.Builder outputColumnMapping = ImmutableList.builder();
+ Map mappedHiveColumnIndices = new HashMap<>();
+ int projectedColumnCount = 0;
+
+ for (HiveColumnHandle column : columns) {
+ int hiveColumnIndex = column.getBaseHiveColumnIndex();
+ Integer mapped = mappedHiveColumnIndices.get(hiveColumnIndex);
+
+ if (mapped == null) {
+ projectedColumns.add(column.getBaseColumn());
+ mappedHiveColumnIndices.put(hiveColumnIndex, projectedColumnCount);
+ outputColumnMapping.add(projectedColumnCount);
+ projectedColumnCount++;
+ }
+ else {
+ outputColumnMapping.add(mapped);
+ }
+ }
+
+ return Optional.of(new ReaderProjections(projectedColumns.build(), outputColumnMapping.build()));
+ }
+
+ /**
+ * Creates a set of sufficient columns for the input projected columns and prepares a mapping between the two. For example,
+ * if input {@param columns} include columns "a.b" and "a.b.c", then they will be projected from a single column "a.b".
+ */
+ public static Optional projectSufficientColumns(List columns)
+ {
+ requireNonNull(columns, "columns is null");
+
+ if (columns.stream().allMatch(HiveColumnHandle::isBaseColumn)) {
+ return Optional.empty();
+ }
+
+ ImmutableBiMap.Builder dereferenceChainsBuilder = ImmutableBiMap.builder();
+
+ for (HiveColumnHandle column : columns) {
+ List indices = column.getHiveColumnProjectionInfo()
+ .map(HiveColumnProjectionInfo::getDereferenceIndices)
+ .orElse(ImmutableList.of());
+
+ DereferenceChain dereferenceChain = new DereferenceChain(column.getBaseColumnName(), indices);
+ dereferenceChainsBuilder.put(dereferenceChain, column);
+ }
+
+ BiMap dereferenceChains = dereferenceChainsBuilder.build();
+
+ List sufficientColumns = new ArrayList<>();
+ ImmutableList.Builder outputColumnMapping = ImmutableList.builder();
+
+ Map pickedColumns = new HashMap<>();
+
+ // Pick a covering column for every column
+ for (HiveColumnHandle columnHandle : columns) {
+ DereferenceChain column = dereferenceChains.inverse().get(columnHandle);
+ List orderedPrefixes = column.getOrderedPrefixes();
+ DereferenceChain chosenColumn = null;
+
+ // Shortest existing prefix is chosen as the input.
+ for (DereferenceChain prefix : orderedPrefixes) {
+ if (dereferenceChains.containsKey(prefix)) {
+ chosenColumn = prefix;
+ break;
+ }
+ }
+
+ checkState(chosenColumn != null, "chosenColumn is null");
+ int inputBlockIndex;
+
+ if (pickedColumns.containsKey(chosenColumn)) {
+ // Use already picked column
+ inputBlockIndex = pickedColumns.get(chosenColumn);
+ }
+ else {
+ // Add a new column for the reader
+ sufficientColumns.add(dereferenceChains.get(chosenColumn));
+ pickedColumns.put(chosenColumn, sufficientColumns.size() - 1);
+ inputBlockIndex = sufficientColumns.size() - 1;
+ }
+
+ outputColumnMapping.add(inputBlockIndex);
+ }
+
+ return Optional.of(new ReaderProjections(sufficientColumns, outputColumnMapping.build()));
+ }
+
+ private static class DereferenceChain
+ {
+ private final String name;
+ private final List indices;
+
+ public DereferenceChain(String name, List indices)
+ {
+ this.name = requireNonNull(name, "name is null");
+ this.indices = ImmutableList.copyOf(requireNonNull(indices, "indices is null"));
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ DereferenceChain that = (DereferenceChain) o;
+ return Objects.equals(name, that.name) &&
+ Objects.equals(indices, that.indices);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(name, indices);
+ }
+
+ /**
+ * Get Prefixes of this Dereference chain in increasing order of lengths
+ */
+ public List getOrderedPrefixes()
+ {
+ ImmutableList.Builder prefixes = ImmutableList.builder();
+
+ for (int prefixLen = 0; prefixLen <= indices.size(); prefixLen++) {
+ prefixes.add(new DereferenceChain(name, indices.subList(0, prefixLen)));
+ }
+
+ return prefixes.build();
+ }
+ }
+}
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/ReaderProjectionsAdapter.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/ReaderProjectionsAdapter.java
new file mode 100644
index 000000000000..d7bf5e278168
--- /dev/null
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/ReaderProjectionsAdapter.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.hive;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.block.BlockBuilder;
+import io.prestosql.spi.block.ColumnarRow;
+import io.prestosql.spi.block.LazyBlock;
+import io.prestosql.spi.block.LazyBlockLoader;
+import io.prestosql.spi.type.Type;
+
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.ReaderProjectionsAdapter.ChannelMapping.createChannelMapping;
+import static io.prestosql.spi.block.ColumnarRow.toColumnarRow;
+import static java.util.Objects.requireNonNull;
+
+public class ReaderProjectionsAdapter
+{
+ private final List outputToInputMapping;
+ private final List outputTypes;
+ private final List inputTypes;
+
+ public ReaderProjectionsAdapter(List expectedHiveColumns, ReaderProjections readerProjections)
+ {
+ requireNonNull(expectedHiveColumns, "expectedHiveColumns is null");
+ requireNonNull(readerProjections, "readerProjections is null");
+
+ ImmutableList.Builder mappingBuilder = ImmutableList.builder();
+
+ for (int i = 0; i < expectedHiveColumns.size(); i++) {
+ HiveColumnHandle projectedColumnHandle = readerProjections.readerColumnForHiveColumnAt(i);
+ int inputChannel = readerProjections.readerColumnPositionForHiveColumnAt(i);
+ ChannelMapping mapping = createChannelMapping(expectedHiveColumns.get(i), projectedColumnHandle, inputChannel);
+ mappingBuilder.add(mapping);
+ }
+
+ outputToInputMapping = mappingBuilder.build();
+
+ outputTypes = expectedHiveColumns.stream()
+ .map(HiveColumnHandle::getType)
+ .collect(toImmutableList());
+
+ inputTypes = readerProjections.getReaderColumns().stream()
+ .map(HiveColumnHandle::getType)
+ .collect(toImmutableList());
+ }
+
+ public Page adaptPage(Page input)
+ {
+ if (input == null) {
+ return null;
+ }
+
+ Block[] blocks = new Block[outputToInputMapping.size()];
+
+ // Prepare adaptations to extract dereferences
+ for (int i = 0; i < outputToInputMapping.size(); i++) {
+ ChannelMapping mapping = outputToInputMapping.get(i);
+
+ Block inputBlock = input.getBlock(mapping.getInputChannelIndex());
+ blocks[i] = createAdaptedLazyBlock(inputBlock, mapping.getDereferenceSequence(), outputTypes.get(i));
+ }
+
+ return new Page(input.getPositionCount(), blocks);
+ }
+
+ private static Block createAdaptedLazyBlock(Block inputBlock, List dereferenceSequence, Type type)
+ {
+ if (dereferenceSequence.size() == 0) {
+ return inputBlock;
+ }
+
+ if (inputBlock == null) {
+ return null;
+ }
+
+ return new LazyBlock(inputBlock.getPositionCount(), new DereferenceBlockLoader(inputBlock, dereferenceSequence, type));
+ }
+
+ private static class DereferenceBlockLoader
+ implements LazyBlockLoader
+ {
+ private final List dereferenceSequence;
+ private final Type type;
+ private boolean loaded;
+ private Block inputBlock;
+
+ DereferenceBlockLoader(Block inputBlock, List dereferenceSequence, Type type)
+ {
+ this.inputBlock = requireNonNull(inputBlock, "inputBlock is null");
+ this.dereferenceSequence = requireNonNull(dereferenceSequence, "dereferenceSequence is null");
+ this.type = type;
+ }
+
+ @Override
+ public Block load()
+ {
+ checkState(!loaded, "Already loaded");
+ Block loadedBlock = loadInternalBlock(dereferenceSequence, inputBlock);
+ inputBlock = null;
+ loaded = true;
+ return loadedBlock;
+ }
+
+ /**
+ * Applies dereference operations on the input block to extract the required internal block. If the input block is lazy
+ * in a nested manner, this implementation avoids loading the entire input block.
+ */
+ private Block loadInternalBlock(List dereferences, Block parentBlock)
+ {
+ if (dereferences.size() == 0) {
+ return parentBlock.getLoadedBlock();
+ }
+
+ ColumnarRow columnarRow = toColumnarRow(parentBlock);
+
+ int dereferenceIndex = dereferences.get(0);
+ List remainingDereferences = dereferences.subList(1, dereferences.size());
+
+ Block fieldBlock = columnarRow.getField(dereferenceIndex);
+ Block loadedInternalBlock = loadInternalBlock(remainingDereferences, fieldBlock);
+
+ // Field blocks provided by ColumnarRow can have a smaller position count, because they do not store nulls.
+ // The following step adds null elements (when required) to the loaded block.
+ return adaptNulls(columnarRow, loadedInternalBlock);
+ }
+
+ private Block adaptNulls(ColumnarRow columnarRow, Block loadedInternalBlock)
+ {
+ // TODO: The current implementation copies over data to a new block builder when a null row element is found.
+ // We can optimize this by using a Block implementation that uses a null vector of the parent row block and
+ // the block for the field.
+
+ BlockBuilder newlyCreatedBlock = null;
+ int fieldBlockPosition = 0;
+
+ for (int i = 0; i < columnarRow.getPositionCount(); i++) {
+ boolean isRowNull = columnarRow.isNull(i);
+
+ if (isRowNull) {
+ // A new block is only created when a null is encountered for the first time.
+ if (newlyCreatedBlock == null) {
+ newlyCreatedBlock = type.createBlockBuilder(null, columnarRow.getPositionCount());
+
+ // Copy over all elements encountered so far to the new block
+ for (int j = 0; j < i; j++) {
+ type.appendTo(loadedInternalBlock, j, newlyCreatedBlock);
+ }
+ }
+ newlyCreatedBlock.appendNull();
+ }
+ else {
+ if (newlyCreatedBlock != null) {
+ type.appendTo(loadedInternalBlock, fieldBlockPosition, newlyCreatedBlock);
+ }
+ fieldBlockPosition++;
+ }
+ }
+
+ if (newlyCreatedBlock == null) {
+ // If there was no need to create a null, return the original block
+ return loadedInternalBlock;
+ }
+
+ return newlyCreatedBlock.build();
+ }
+ }
+
+ List getOutputToInputMapping()
+ {
+ return outputToInputMapping;
+ }
+
+ List getOutputTypes()
+ {
+ return outputTypes;
+ }
+
+ List getInputTypes()
+ {
+ return inputTypes;
+ }
+
+ @VisibleForTesting
+ static class ChannelMapping
+ {
+ private final int inputChannelIndex;
+ private final List dereferenceSequence;
+
+ private ChannelMapping(int inputBlockIndex, List dereferenceSequence)
+ {
+ checkArgument(inputBlockIndex >= 0, "inputBlockIndex cannot be negative");
+ this.inputChannelIndex = inputBlockIndex;
+ this.dereferenceSequence = ImmutableList.copyOf(requireNonNull(dereferenceSequence, "dereferences is null"));
+ }
+
+ public int getInputChannelIndex()
+ {
+ return inputChannelIndex;
+ }
+
+ public List getDereferenceSequence()
+ {
+ return dereferenceSequence;
+ }
+
+ static ChannelMapping createChannelMapping(HiveColumnHandle expected, HiveColumnHandle delegate, int inputBlockIndex)
+ {
+ List dereferences = validateProjectionAndExtractDereferences(expected, delegate);
+ return new ChannelMapping(inputBlockIndex, dereferences);
+ }
+
+ private static List validateProjectionAndExtractDereferences(HiveColumnHandle expectedColumn, HiveColumnHandle readerColumn)
+ {
+ checkArgument(expectedColumn.getBaseColumn().equals(readerColumn.getBaseColumn()), "reader column is not valid for expected column");
+
+ List expectedDereferences = expectedColumn.getHiveColumnProjectionInfo()
+ .map(HiveColumnProjectionInfo::getDereferenceIndices)
+ .orElse(ImmutableList.of());
+
+ List readerDereferences = readerColumn.getHiveColumnProjectionInfo()
+ .map(HiveColumnProjectionInfo::getDereferenceIndices)
+ .orElse(ImmutableList.of());
+
+ checkArgument(readerDereferences.size() <= expectedDereferences.size(), "Field returned by the reader should include expected field");
+ checkArgument(expectedDereferences.subList(0, readerDereferences.size()).equals(readerDereferences), "Field returned by the reader should be a prefix of expected field");
+
+ return expectedDereferences.subList(readerDereferences.size(), expectedDereferences.size());
+ }
+ }
+}
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSourceFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSourceFactory.java
index 5dc45a9fb4bb..601f54bee51e 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSourceFactory.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSourceFactory.java
@@ -31,6 +31,7 @@
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveColumnHandle;
import io.prestosql.plugin.hive.HivePageSourceFactory;
+import io.prestosql.plugin.hive.ReaderProjections;
import io.prestosql.plugin.hive.orc.OrcPageSource.ColumnAdaptation;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ConnectorPageSource;
@@ -71,6 +72,7 @@
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_FILE_MISSING_COLUMN_NAMES;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_MISSING_DATA;
+import static io.prestosql.plugin.hive.HivePageSourceFactory.ReaderPageSourceWithProjections.noProjectionAdaptation;
import static io.prestosql.plugin.hive.HiveSessionProperties.getOrcLazyReadSmallRanges;
import static io.prestosql.plugin.hive.HiveSessionProperties.getOrcMaxBufferSize;
import static io.prestosql.plugin.hive.HiveSessionProperties.getOrcMaxMergeDistance;
@@ -80,6 +82,7 @@
import static io.prestosql.plugin.hive.HiveSessionProperties.isOrcBloomFiltersEnabled;
import static io.prestosql.plugin.hive.HiveSessionProperties.isOrcNestedLazy;
import static io.prestosql.plugin.hive.HiveSessionProperties.isUseOrcColumnNames;
+import static io.prestosql.plugin.hive.ReaderProjections.projectBaseColumns;
import static io.prestosql.plugin.hive.orc.OrcPageSource.handleException;
import static io.prestosql.plugin.hive.util.HiveUtil.isDeserializerClass;
import static io.prestosql.spi.type.BigintType.BIGINT;
@@ -122,7 +125,7 @@ public OrcPageSourceFactory(
}
@Override
- public Optional extends ConnectorPageSource> createPageSource(
+ public Optional createPageSource(
Configuration configuration,
ConnectorSession session,
Path path,
@@ -141,10 +144,14 @@ public Optional extends ConnectorPageSource> createPageSource(
// per HIVE-13040 and ORC-162, empty files are allowed
if (fileSize == 0) {
- return Optional.of(new FixedPageSource(ImmutableList.of()));
+ ReaderPageSourceWithProjections context = noProjectionAdaptation(new FixedPageSource(ImmutableList.of()));
+ return Optional.of(context);
}
- return Optional.of(createOrcPageSource(
+ Optional projectedReaderColumns = projectBaseColumns(columns);
+ effectivePredicate = effectivePredicate.transform(column -> column.isBaseColumn() ? column : null);
+
+ ConnectorPageSource orcPageSource = createOrcPageSource(
hdfsEnvironment,
session.getUser(),
configuration,
@@ -152,7 +159,9 @@ public Optional extends ConnectorPageSource> createPageSource(
start,
length,
fileSize,
- columns,
+ projectedReaderColumns
+ .map(ReaderProjections::getReaderColumns)
+ .orElse(columns),
isUseOrcColumnNames(session),
isFullAcidTable(Maps.fromProperties(schema)),
effectivePredicate,
@@ -167,7 +176,9 @@ public Optional extends ConnectorPageSource> createPageSource(
.withNestedLazy(isOrcNestedLazy(session))
.withBloomFiltersEnabled(isOrcBloomFiltersEnabled(session)),
deleteDeltaLocations,
- stats));
+ stats);
+
+ return Optional.of(new ReaderPageSourceWithProjections(orcPageSource, projectedReaderColumns));
}
private static OrcPageSource createOrcPageSource(
@@ -249,8 +260,8 @@ private static OrcPageSource createOrcPageSource(
if (useOrcColumnNames || isFullAcid) {
orcColumn = fileColumnsByName.get(column.getName().toLowerCase(ENGLISH));
}
- else if (column.getHiveColumnIndex() < fileColumns.size()) {
- orcColumn = fileColumns.get(column.getHiveColumnIndex());
+ else if (column.getBaseHiveColumnIndex() < fileColumns.size()) {
+ orcColumn = fileColumns.get(column.getBaseHiveColumnIndex());
}
Type readType = column.getType();
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetPageSourceFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetPageSourceFactory.java
index 028349c4eabc..a43a2d8a2aa6 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetPageSourceFactory.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetPageSourceFactory.java
@@ -31,6 +31,7 @@
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveColumnHandle;
import io.prestosql.plugin.hive.HivePageSourceFactory;
+import io.prestosql.plugin.hive.ReaderProjections;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ConnectorPageSource;
import io.prestosql.spi.connector.ConnectorSession;
@@ -78,6 +79,7 @@
import static io.prestosql.plugin.hive.HiveSessionProperties.getParquetMaxReadBlockSize;
import static io.prestosql.plugin.hive.HiveSessionProperties.isFailOnCorruptedParquetStatistics;
import static io.prestosql.plugin.hive.HiveSessionProperties.isUseParquetColumnNames;
+import static io.prestosql.plugin.hive.ReaderProjections.projectBaseColumns;
import static io.prestosql.plugin.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource;
import static io.prestosql.plugin.hive.parquet.ParquetColumnIOConverter.constructField;
import static io.prestosql.plugin.hive.util.HiveUtil.getDeserializerClassName;
@@ -108,7 +110,7 @@ public ParquetPageSourceFactory(HdfsEnvironment hdfsEnvironment, FileFormatDataS
}
@Override
- public Optional extends ConnectorPageSource> createPageSource(
+ public Optional createPageSource(
Configuration configuration,
ConnectorSession session,
Path path,
@@ -127,7 +129,12 @@ public Optional extends ConnectorPageSource> createPageSource(
checkArgument(!deleteDeltaLocations.isPresent(), "Delete delta is not supported");
- return Optional.of(createParquetPageSource(
+ // Ignore predicates on partial columns for now.
+ effectivePredicate = effectivePredicate.transform(column -> column.isBaseColumn() ? column : null);
+
+ Optional projectedReaderColumns = projectBaseColumns(columns);
+
+ ConnectorPageSource parquetPageSource = createParquetPageSource(
hdfsEnvironment,
session.getUser(),
configuration,
@@ -135,13 +142,17 @@ public Optional extends ConnectorPageSource> createPageSource(
start,
length,
fileSize,
- columns,
+ projectedReaderColumns
+ .map(ReaderProjections::getReaderColumns)
+ .orElse(columns),
isUseParquetColumnNames(session),
options
.withFailOnCorruptedStatistics(isFailOnCorruptedParquetStatistics(session))
.withMaxReadBlockSize(getParquetMaxReadBlockSize(session)),
effectivePredicate,
- stats));
+ stats);
+
+ return Optional.of(new ReaderPageSourceWithProjections(parquetPageSource, projectedReaderColumns));
}
public static ParquetPageSource createParquetPageSource(
@@ -220,7 +231,7 @@ public static ParquetPageSource createParquetPageSource(
prestoTypes.add(column.getType());
internalFields.add(parquetField.flatMap(field -> {
- String columnName = useParquetColumnNames ? column.getName() : fileSchema.getFields().get(column.getHiveColumnIndex()).getName();
+ String columnName = useParquetColumnNames ? column.getName() : fileSchema.getFields().get(column.getBaseHiveColumnIndex()).getName();
return constructField(column.getType(), lookupColumnByName(messageColumnIO, columnName));
}));
}
@@ -281,8 +292,8 @@ private static org.apache.parquet.schema.Type getParquetType(HiveColumnHandle co
return getParquetTypeByName(column.getName(), messageType);
}
- if (column.getHiveColumnIndex() < messageType.getFieldCount()) {
- return messageType.getType(column.getHiveColumnIndex());
+ if (column.getBaseHiveColumnIndex() < messageType.getFieldCount()) {
+ return messageType.getType(column.getBaseHiveColumnIndex());
}
return null;
}
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/rcfile/RcFilePageSource.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/rcfile/RcFilePageSource.java
index 039c05aab737..6d4e295b109d 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/rcfile/RcFilePageSource.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/rcfile/RcFilePageSource.java
@@ -79,7 +79,7 @@ public RcFilePageSource(RcFileReader rcFileReader, List column
typesBuilder.add(column.getType());
hiveTypesBuilder.add(column.getHiveType());
- hiveColumnIndexes[columnIndex] = column.getHiveColumnIndex();
+ hiveColumnIndexes[columnIndex] = column.getBaseHiveColumnIndex();
if (hiveColumnIndexes[columnIndex] >= rcFileReader.getColumnCount()) {
// this file may contain fewer fields than what's declared in the schema
@@ -135,9 +135,7 @@ public Page getNextPage()
}
}
- Page page = new Page(currentPageSize, blocks);
-
- return page;
+ return new Page(currentPageSize, blocks);
}
catch (PrestoException e) {
closeWithSuppression(e);
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/rcfile/RcFilePageSourceFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/rcfile/RcFilePageSourceFactory.java
index 760db67ab05e..1d6d4378ef14 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/rcfile/RcFilePageSourceFactory.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/rcfile/RcFilePageSourceFactory.java
@@ -23,6 +23,7 @@
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveColumnHandle;
import io.prestosql.plugin.hive.HivePageSourceFactory;
+import io.prestosql.plugin.hive.ReaderProjections;
import io.prestosql.rcfile.AircompressorCodecFactory;
import io.prestosql.rcfile.HadoopCodecFactory;
import io.prestosql.rcfile.RcFileCorruptionException;
@@ -59,6 +60,7 @@
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_MISSING_DATA;
+import static io.prestosql.plugin.hive.ReaderProjections.projectBaseColumns;
import static io.prestosql.plugin.hive.util.HiveUtil.getDeserializerClassName;
import static io.prestosql.rcfile.text.TextRcFileEncoding.DEFAULT_NULL_SEQUENCE;
import static io.prestosql.rcfile.text.TextRcFileEncoding.DEFAULT_SEPARATORS;
@@ -93,7 +95,7 @@ public RcFilePageSourceFactory(TypeManager typeManager, HdfsEnvironment hdfsEnvi
}
@Override
- public Optional extends ConnectorPageSource> createPageSource(
+ public Optional createPageSource(
Configuration configuration,
ConnectorSession session,
Path path,
@@ -124,6 +126,12 @@ else if (deserializerClassName.equals(ColumnarSerDe.class.getName())) {
throw new PrestoException(HIVE_BAD_DATA, "RCFile is empty: " + path);
}
+ Optional readerProjections = projectBaseColumns(columns);
+
+ List projectedReaderColumns = readerProjections
+ .map(ReaderProjections::getReaderColumns)
+ .orElse(columns);
+
FSDataInputStream inputStream;
try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getUser(), path, configuration);
@@ -139,8 +147,8 @@ else if (deserializerClassName.equals(ColumnarSerDe.class.getName())) {
try {
ImmutableMap.Builder readColumns = ImmutableMap.builder();
- for (HiveColumnHandle column : columns) {
- readColumns.put(column.getHiveColumnIndex(), column.getHiveType().getType(typeManager));
+ for (HiveColumnHandle column : projectedReaderColumns) {
+ readColumns.put(column.getBaseHiveColumnIndex(), column.getHiveType().getType(typeManager));
}
RcFileReader rcFileReader = new RcFileReader(
@@ -152,7 +160,8 @@ else if (deserializerClassName.equals(ColumnarSerDe.class.getName())) {
length,
DataSize.of(8, Unit.MEGABYTE));
- return Optional.of(new RcFilePageSource(rcFileReader, columns));
+ ConnectorPageSource pageSource = new RcFilePageSource(rcFileReader, projectedReaderColumns);
+ return Optional.of(new ReaderPageSourceWithProjections(pageSource, readerProjections));
}
catch (Throwable e) {
try {
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/S3SelectRecordCursorProvider.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/S3SelectRecordCursorProvider.java
index d310524fbdc7..31b25a34a92f 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/S3SelectRecordCursorProvider.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/S3SelectRecordCursorProvider.java
@@ -13,11 +13,13 @@
*/
package io.prestosql.plugin.hive.s3select;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveColumnHandle;
import io.prestosql.plugin.hive.HiveRecordCursorProvider;
import io.prestosql.plugin.hive.IonSqlQueryBuilder;
+import io.prestosql.plugin.hive.ReaderProjections;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.RecordCursor;
@@ -37,6 +39,7 @@
import java.util.Set;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
+import static io.prestosql.plugin.hive.ReaderProjections.projectBaseColumns;
import static io.prestosql.plugin.hive.util.HiveUtil.getDeserializerClassName;
import static java.util.Objects.requireNonNull;
@@ -55,7 +58,7 @@ public S3SelectRecordCursorProvider(HdfsEnvironment hdfsEnvironment, PrestoS3Cli
}
@Override
- public Optional createRecordCursor(
+ public Optional createRecordCursor(
Configuration configuration,
ConnectorSession session,
Path path,
@@ -80,12 +83,22 @@ public Optional createRecordCursor(
throw new PrestoException(HIVE_FILESYSTEM_ERROR, "Failed getting FileSystem: " + path, e);
}
+ Optional projectedReaderColumns = projectBaseColumns(columns);
+ // Ignore predicates on partial columns for now.
+ effectivePredicate = effectivePredicate.transform(column -> column.isBaseColumn() ? column : null);
+
String serdeName = getDeserializerClassName(schema);
if (CSV_SERDES.contains(serdeName)) {
+ List readerColumns = projectedReaderColumns
+ .map(ReaderProjections::getReaderColumns)
+ .orElse(ImmutableList.of());
+
IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager);
- String ionSqlQuery = queryBuilder.buildSql(columns, effectivePredicate);
+ String ionSqlQuery = queryBuilder.buildSql(readerColumns, effectivePredicate);
S3SelectLineRecordReader recordReader = new S3SelectCsvRecordReader(configuration, path, start, length, schema, ionSqlQuery, s3ClientFactory);
- return Optional.of(new S3SelectRecordCursor<>(configuration, path, recordReader, length, schema, columns, hiveStorageTimeZone));
+
+ RecordCursor cursor = new S3SelectRecordCursor<>(configuration, path, recordReader, length, schema, readerColumns, hiveStorageTimeZone);
+ return Optional.of(new ReaderRecordCursorWithProjections(cursor, projectedReaderColumns));
}
// unsupported serdes
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/util/HiveUtil.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/util/HiveUtil.java
index 83bbe99d1d19..157cc037dfe1 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/util/HiveUtil.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/util/HiveUtil.java
@@ -111,6 +111,7 @@
import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.PARTITION_KEY;
import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.REGULAR;
import static io.prestosql.plugin.hive.HiveColumnHandle.bucketColumnHandle;
+import static io.prestosql.plugin.hive.HiveColumnHandle.createBaseColumn;
import static io.prestosql.plugin.hive.HiveColumnHandle.fileModifiedTimeColumnHandle;
import static io.prestosql.plugin.hive.HiveColumnHandle.fileSizeColumnHandle;
import static io.prestosql.plugin.hive.HiveColumnHandle.isBucketColumnHandle;
@@ -222,8 +223,12 @@ private HiveUtil()
List readColumns = columns.stream()
.filter(column -> column.getColumnType() == REGULAR)
.collect(toImmutableList());
+
+ // Projected columns are not supported here
+ readColumns.forEach(readColumn -> checkArgument(readColumn.isBaseColumn(), "column %s is not a base column", readColumn.getName()));
+
List readHiveColumnIndexes = readColumns.stream()
- .map(HiveColumnHandle::getHiveColumnIndex)
+ .map(HiveColumnHandle::getBaseHiveColumnIndex)
.collect(toImmutableList());
// Tell hive the columns we would like to read, this lets hive optimize reading column oriented files
@@ -908,7 +913,7 @@ public static List getRegularColumnHandles(Table table, TypeMa
// ignore unsupported types rather than failing
HiveType hiveType = field.getType();
if (hiveType.isSupportedType()) {
- columns.add(new HiveColumnHandle(field.getName(), hiveType, hiveType.getType(typeManager), hiveColumnIndex, REGULAR, field.getComment()));
+ columns.add(createBaseColumn(field.getName(), hiveColumnIndex, hiveType, hiveType.getType(typeManager), REGULAR, field.getComment()));
}
hiveColumnIndex++;
}
@@ -926,7 +931,7 @@ public static List getPartitionKeyColumnHandles(Table table, T
if (!hiveType.isSupportedType()) {
throw new PrestoException(NOT_SUPPORTED, format("Unsupported Hive type %s found in partition keys of table %s.%s", hiveType, table.getDatabaseName(), table.getTableName()));
}
- columns.add(new HiveColumnHandle(field.getName(), hiveType, hiveType.getType(typeManager), -1, PARTITION_KEY, field.getComment()));
+ columns.add(createBaseColumn(field.getName(), -1, hiveType, hiveType.getType(typeManager), PARTITION_KEY, field.getComment()));
}
return columns.build();
diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHive.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHive.java
index 3a789e963c55..93806c6ef484 100644
--- a/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHive.java
+++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHive.java
@@ -84,12 +84,17 @@
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.connector.ConstraintApplicationResult;
import io.prestosql.spi.connector.DiscretePredicates;
+import io.prestosql.spi.connector.ProjectionApplicationResult;
+import io.prestosql.spi.connector.ProjectionApplicationResult.Assignment;
import io.prestosql.spi.connector.RecordCursor;
import io.prestosql.spi.connector.RecordPageSource;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.connector.SchemaTablePrefix;
import io.prestosql.spi.connector.TableNotFoundException;
import io.prestosql.spi.connector.ViewNotFoundException;
+import io.prestosql.spi.expression.ConnectorExpression;
+import io.prestosql.spi.expression.FieldDereference;
+import io.prestosql.spi.expression.Variable;
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.NullableValue;
import io.prestosql.spi.predicate.Range;
@@ -141,6 +146,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
@@ -182,6 +188,7 @@
import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.PARTITION_KEY;
import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.REGULAR;
import static io.prestosql.plugin.hive.HiveColumnHandle.bucketColumnHandle;
+import static io.prestosql.plugin.hive.HiveColumnHandle.createBaseColumn;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_INVALID_PARTITION_VALUE;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH;
import static io.prestosql.plugin.hive.HiveMetadata.PRESTO_QUERY_ID_NAME;
@@ -640,11 +647,11 @@ protected void setupHive(String databaseName, String timeZoneId)
invalidTableHandle = new HiveTableHandle(database, INVALID_TABLE, ImmutableMap.of(), ImmutableList.of(), Optional.empty());
- dsColumn = new HiveColumnHandle("ds", HIVE_STRING, VARCHAR, -1, PARTITION_KEY, Optional.empty());
- fileFormatColumn = new HiveColumnHandle("file_format", HIVE_STRING, VARCHAR, -1, PARTITION_KEY, Optional.empty());
- dummyColumn = new HiveColumnHandle("dummy", HIVE_INT, INTEGER, -1, PARTITION_KEY, Optional.empty());
- intColumn = new HiveColumnHandle("t_int", HIVE_INT, INTEGER, -1, PARTITION_KEY, Optional.empty());
- invalidColumnHandle = new HiveColumnHandle(INVALID_COLUMN, HIVE_STRING, VARCHAR, 0, REGULAR, Optional.empty());
+ dsColumn = createBaseColumn("ds", -1, HIVE_STRING, VARCHAR, PARTITION_KEY, Optional.empty());
+ fileFormatColumn = createBaseColumn("file_format", -1, HIVE_STRING, VARCHAR, PARTITION_KEY, Optional.empty());
+ dummyColumn = createBaseColumn("dummy", -1, HIVE_INT, INTEGER, PARTITION_KEY, Optional.empty());
+ intColumn = createBaseColumn("t_int", -1, HIVE_INT, INTEGER, PARTITION_KEY, Optional.empty());
+ invalidColumnHandle = createBaseColumn(INVALID_COLUMN, 0, HIVE_STRING, VARCHAR, REGULAR, Optional.empty());
List partitionColumns = ImmutableList.of(dsColumn, fileFormatColumn, dummyColumn);
tablePartitionFormatPartitions = ImmutableList.builder()
@@ -2961,6 +2968,147 @@ protected void testPartitionStatisticsSampling(List columns, Par
}
}
+ @Test
+ public void testApplyProjection()
+ throws Exception
+ {
+ ColumnMetadata bigIntColumn0 = new ColumnMetadata("int0", BIGINT);
+ ColumnMetadata bigIntColumn1 = new ColumnMetadata("int1", BIGINT);
+
+ RowType oneLevelRowType = toRowType(ImmutableList.of(bigIntColumn0, bigIntColumn1));
+ ColumnMetadata oneLevelRow0 = new ColumnMetadata("onelevelrow0", oneLevelRowType);
+
+ RowType twoLevelRowType = toRowType(ImmutableList.of(oneLevelRow0, bigIntColumn0, bigIntColumn1));
+ ColumnMetadata twoLevelRow0 = new ColumnMetadata("twolevelrow0", twoLevelRowType);
+
+ List columnsForApplyProjectionTest = ImmutableList.of(bigIntColumn0, bigIntColumn1, oneLevelRow0, twoLevelRow0);
+
+ SchemaTableName tableName = temporaryTable("apply_projection_tester");
+ doCreateEmptyTable(tableName, ORC, columnsForApplyProjectionTest);
+
+ try (Transaction transaction = newTransaction()) {
+ ConnectorSession session = newSession();
+ ConnectorMetadata metadata = transaction.getMetadata();
+ ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
+
+ List columnHandles = metadata.getColumnHandles(session, tableHandle).values().stream()
+ .filter(columnHandle -> !((HiveColumnHandle) columnHandle).isHidden())
+ .collect(toList());
+ assertEquals(columnHandles.size(), columnsForApplyProjectionTest.size());
+
+ Map columnHandleMap = columnHandles.stream()
+ .collect(toImmutableMap(handle -> ((HiveColumnHandle) handle).getBaseColumnName(), Function.identity()));
+
+ // Emulate symbols coming from the query plan and map them to column handles
+ Map columnHandlesWithSymbols = ImmutableMap.of(
+ "symbol_0", columnHandleMap.get("int0"),
+ "symbol_1", columnHandleMap.get("int1"),
+ "symbol_2", columnHandleMap.get("onelevelrow0"),
+ "symbol_3", columnHandleMap.get("twolevelrow0"));
+
+ // Create variables for the emulated symbols
+ Map symbolVariableMapping = columnHandlesWithSymbols.entrySet().stream()
+ .collect(toImmutableMap(
+ e -> e.getKey(),
+ e -> new Variable(
+ e.getKey(),
+ ((HiveColumnHandle) e.getValue()).getBaseType())));
+
+ // Create dereference expressions for testing
+ FieldDereference symbol2Field0 = new FieldDereference(BIGINT, symbolVariableMapping.get("symbol_2"), 0);
+ FieldDereference symbol3Field0 = new FieldDereference(oneLevelRowType, symbolVariableMapping.get("symbol_3"), 0);
+ FieldDereference symbol3Field0Field0 = new FieldDereference(BIGINT, symbol3Field0, 0);
+ FieldDereference symbol3Field1 = new FieldDereference(BIGINT, symbolVariableMapping.get("symbol_3"), 1);
+
+ Map inputAssignments;
+ List inputProjections;
+ Optional> projectionResult;
+ List expectedProjections;
+ Map expectedAssignments;
+
+ // Test no projection pushdown in case of all variable references
+ inputAssignments = getColumnHandlesFor(columnHandlesWithSymbols, ImmutableList.of("symbol_0", "symbol_1"));
+ inputProjections = ImmutableList.of(symbolVariableMapping.get("symbol_0"), symbolVariableMapping.get("symbol_1"));
+ projectionResult = metadata.applyProjection(session, tableHandle, inputProjections, inputAssignments);
+ assertProjectionResult(projectionResult, true, ImmutableList.of(), ImmutableMap.of());
+
+ // Test projection pushdown for dereferences
+ inputAssignments = getColumnHandlesFor(columnHandlesWithSymbols, ImmutableList.of("symbol_2", "symbol_3"));
+ inputProjections = ImmutableList.of(symbol2Field0, symbol3Field0Field0, symbol3Field1);
+ expectedAssignments = ImmutableMap.of(
+ "onelevelrow0#f_int0", BIGINT,
+ "twolevelrow0#f_onelevelrow0#f_int0", BIGINT,
+ "twolevelrow0#f_int0", BIGINT);
+ expectedProjections = ImmutableList.of(
+ new Variable("onelevelrow0#f_int0", BIGINT),
+ new Variable("twolevelrow0#f_onelevelrow0#f_int0", BIGINT),
+ new Variable("twolevelrow0#f_int0", BIGINT));
+ projectionResult = metadata.applyProjection(session, tableHandle, inputProjections, inputAssignments);
+ assertProjectionResult(projectionResult, false, expectedProjections, expectedAssignments);
+
+ // Test reuse of virtual column handles
+ // Round-1: input projections [symbol_2, symbol_2.int0]. virtual handle is created for symbol_2.int0.
+ inputAssignments = getColumnHandlesFor(columnHandlesWithSymbols, ImmutableList.of("symbol_2"));
+ inputProjections = ImmutableList.of(symbol2Field0, symbolVariableMapping.get("symbol_2"));
+ projectionResult = metadata.applyProjection(session, tableHandle, inputProjections, inputAssignments);
+ expectedProjections = ImmutableList.of(new Variable("onelevelrow0#f_int0", BIGINT), symbolVariableMapping.get("symbol_2"));
+ expectedAssignments = ImmutableMap.of("onelevelrow0#f_int0", BIGINT, "symbol_2", oneLevelRowType);
+ assertProjectionResult(projectionResult, false, expectedProjections, expectedAssignments);
+
+ // Round-2: input projections [symbol_2.int0 and onelevelrow0#f_int0]. Virtual handle is reused.
+ ProjectionApplicationResult.Assignment newlyCreatedColumn = getOnlyElement(projectionResult.get().getAssignments().stream()
+ .filter(handle -> handle.getVariable().equals("onelevelrow0#f_int0"))
+ .collect(toList()));
+ inputAssignments = ImmutableMap.builder()
+ .putAll(getColumnHandlesFor(columnHandlesWithSymbols, ImmutableList.of("symbol_2")))
+ .put(newlyCreatedColumn.getVariable(), newlyCreatedColumn.getColumn())
+ .build();
+ inputProjections = ImmutableList.of(symbol2Field0, new Variable("onelevelrow0#f_int0", BIGINT));
+ projectionResult = metadata.applyProjection(session, tableHandle, inputProjections, inputAssignments);
+ expectedProjections = ImmutableList.of(new Variable("onelevelrow0#f_int0", BIGINT), new Variable("onelevelrow0#f_int0", BIGINT));
+ expectedAssignments = ImmutableMap.of("onelevelrow0#f_int0", BIGINT);
+ assertProjectionResult(projectionResult, false, expectedProjections, expectedAssignments);
+ }
+ finally {
+ dropTable(tableName);
+ }
+ }
+
+ private static Map getColumnHandlesFor(Map columnHandles, List symbols)
+ {
+ return columnHandles.entrySet().stream()
+ .filter(e -> symbols.contains(e.getKey()))
+ .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ private static void assertProjectionResult(Optional> projectionResult, boolean shouldBeEmpty, List expectedProjections, Map expectedAssignments)
+ {
+ if (shouldBeEmpty) {
+ assertTrue(!projectionResult.isPresent(), "expected projectionResult to be empty");
+ return;
+ }
+
+ assertTrue(projectionResult.isPresent(), "expected non-empty projection result");
+
+ ProjectionApplicationResult result = projectionResult.get();
+
+ // Verify projections
+ assertEquals(expectedProjections, result.getProjections());
+
+ // Verify assignments
+ List assignments = result.getAssignments();
+ Map actualAssignments = uniqueIndex(assignments, Assignment::getVariable);
+
+ for (String variable : expectedAssignments.keySet()) {
+ Type expectedType = expectedAssignments.get(variable);
+ assertTrue(actualAssignments.containsKey(variable));
+ assertEquals(actualAssignments.get(variable).getType(), expectedType);
+ assertEquals(((HiveColumnHandle) actualAssignments.get(variable).getColumn()).getType(), expectedType);
+ }
+
+ assertEquals(actualAssignments.size(), expectedAssignments.size());
+ }
+
private ConnectorSession sampleSize(int sampleSize)
{
return getHiveSession(getHiveConfig()
diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHiveFileFormats.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHiveFileFormats.java
index d559cad4b5e6..cd773f3b29a1 100644
--- a/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHiveFileFormats.java
+++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHiveFileFormats.java
@@ -85,6 +85,8 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.PARTITION_KEY;
import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.REGULAR;
+import static io.prestosql.plugin.hive.HiveColumnHandle.createBaseColumn;
+import static io.prestosql.plugin.hive.HiveColumnProjectionInfo.generatePartialName;
import static io.prestosql.plugin.hive.HivePartitionKey.HIVE_DEFAULT_DYNAMIC_PARTITION;
import static io.prestosql.plugin.hive.HiveTestUtils.SESSION;
import static io.prestosql.plugin.hive.HiveTestUtils.TYPE_MANAGER;
@@ -473,13 +475,47 @@ private static Map asMap(K[] keys, V[] values)
protected List getColumnHandles(List testColumns)
{
List columns = new ArrayList<>();
+ Map hiveColumnIndexes = new HashMap<>();
+
int nextHiveColumnIndex = 0;
for (int i = 0; i < testColumns.size(); i++) {
TestColumn testColumn = testColumns.get(i);
- int columnIndex = testColumn.isPartitionKey() ? -1 : nextHiveColumnIndex++;
- HiveType hiveType = HiveType.valueOf(testColumn.getObjectInspector().getTypeName());
- columns.add(new HiveColumnHandle(testColumn.getName(), hiveType, hiveType.getType(TYPE_MANAGER), columnIndex, testColumn.isPartitionKey() ? PARTITION_KEY : REGULAR, Optional.empty()));
+ int columnIndex;
+ if (testColumn.isPartitionKey()) {
+ columnIndex = -1;
+ }
+ else {
+ if (hiveColumnIndexes.get(testColumn.getBaseName()) != null) {
+ columnIndex = hiveColumnIndexes.get(testColumn.getBaseName());
+ }
+ else {
+ columnIndex = nextHiveColumnIndex++;
+ hiveColumnIndexes.put(testColumn.getBaseName(), columnIndex);
+ }
+ }
+
+ if (testColumn.getDereferenceNames().size() == 0) {
+ HiveType hiveType = HiveType.valueOf(testColumn.getObjectInspector().getTypeName());
+ columns.add(createBaseColumn(testColumn.getName(), columnIndex, hiveType, hiveType.getType(TYPE_MANAGER), testColumn.isPartitionKey() ? PARTITION_KEY : REGULAR, Optional.empty()));
+ }
+ else {
+ HiveType baseHiveType = HiveType.valueOf(testColumn.getBaseObjectInspector().getTypeName());
+ HiveType partialHiveType = baseHiveType.getHiveTypeForDereferences(testColumn.getDereferenceIndices()).get();
+ HiveColumnHandle hiveColumnHandle = new HiveColumnHandle(
+ testColumn.getBaseName(),
+ columnIndex,
+ baseHiveType,
+ baseHiveType.getType(TYPE_MANAGER),
+ Optional.of(new HiveColumnProjectionInfo(
+ testColumn.getDereferenceIndices(),
+ testColumn.getDereferenceNames(),
+ partialHiveType,
+ partialHiveType.getType(TYPE_MANAGER))),
+ testColumn.isPartitionKey() ? PARTITION_KEY : REGULAR,
+ Optional.empty());
+ columns.add(hiveColumnHandle);
+ }
}
return columns;
}
@@ -817,6 +853,10 @@ else if (testColumn.getObjectInspector().getCategory() == Category.PRIMITIVE) {
public static final class TestColumn
{
+ private final String baseName;
+ private final ObjectInspector baseObjectInspector;
+ private final List dereferenceNames;
+ private final List dereferenceIndices;
private final String name;
private final ObjectInspector objectInspector;
private final Object writeValue;
@@ -830,11 +870,30 @@ public TestColumn(String name, ObjectInspector objectInspector, Object writeValu
public TestColumn(String name, ObjectInspector objectInspector, Object writeValue, Object expectedValue, boolean partitionKey)
{
- this.name = requireNonNull(name, "name is null");
+ this(name, objectInspector, ImmutableList.of(), ImmutableList.of(), objectInspector, writeValue, expectedValue, partitionKey);
+ }
+
+ public TestColumn(
+ String baseName,
+ ObjectInspector baseObjectInspector,
+ List dereferenceNames,
+ List dereferenceIndices,
+ ObjectInspector objectInspector,
+ Object writeValue,
+ Object expectedValue,
+ boolean partitionKey)
+ {
+ this.baseName = requireNonNull(baseName, "baseName is null");
+ this.baseObjectInspector = requireNonNull(baseObjectInspector, "baseObjectInspector is null");
+ this.dereferenceNames = requireNonNull(dereferenceNames, "dereferenceNames is null");
+ this.dereferenceIndices = requireNonNull(dereferenceIndices, "dereferenceIndices is null");
+ checkArgument(dereferenceIndices.size() == dereferenceNames.size(), "dereferenceIndices and dereferenceNames should have the same size");
+ this.name = baseName + generatePartialName(dereferenceNames);
this.objectInspector = requireNonNull(objectInspector, "objectInspector is null");
this.writeValue = writeValue;
this.expectedValue = expectedValue;
this.partitionKey = partitionKey;
+ checkArgument(dereferenceNames.size() == 0 || partitionKey == false, "partial column cannot be a partition key");
}
public String getName()
@@ -842,11 +901,31 @@ public String getName()
return name;
}
+ public String getBaseName()
+ {
+ return baseName;
+ }
+
+ public List getDereferenceNames()
+ {
+ return dereferenceNames;
+ }
+
+ public List getDereferenceIndices()
+ {
+ return dereferenceIndices;
+ }
+
public String getType()
{
return objectInspector.getTypeName();
}
+ public ObjectInspector getBaseObjectInspector()
+ {
+ return baseObjectInspector;
+ }
+
public ObjectInspector getObjectInspector()
{
return objectInspector;
@@ -871,7 +950,9 @@ public boolean isPartitionKey()
public String toString()
{
StringBuilder sb = new StringBuilder("TestColumn{");
- sb.append("name='").append(name).append('\'');
+ sb.append("baseName='").append(baseName).append("'");
+ sb.append("dereferenceNames=").append("[").append(dereferenceNames.stream().collect(Collectors.joining(","))).append("]");
+ sb.append("name=").append(name);
sb.append(", objectInspector=").append(objectInspector);
sb.append(", writeValue=").append(writeValue);
sb.append(", expectedValue=").append(expectedValue);
diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java
index 5c213466a828..89bdde3e4c9a 100644
--- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java
+++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java
@@ -76,6 +76,7 @@
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.prestosql.plugin.hive.BackgroundHiveSplitLoader.BucketSplitInfo.createBucketSplitInfo;
import static io.prestosql.plugin.hive.BackgroundHiveSplitLoader.getBucketNumber;
+import static io.prestosql.plugin.hive.HiveColumnHandle.createBaseColumn;
import static io.prestosql.plugin.hive.HiveColumnHandle.pathColumnHandle;
import static io.prestosql.plugin.hive.HiveStorageFormat.CSV;
import static io.prestosql.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
@@ -124,7 +125,7 @@ public class TestBackgroundHiveSplitLoader
private static final List PARTITION_COLUMNS = ImmutableList.of(
new Column("partitionColumn", HIVE_INT, Optional.empty()));
private static final List BUCKET_COLUMN_HANDLES = ImmutableList.of(
- new HiveColumnHandle("col1", HIVE_INT, INTEGER, 0, ColumnType.REGULAR, Optional.empty()));
+ createBaseColumn("col1", 0, HIVE_INT, INTEGER, ColumnType.REGULAR, Optional.empty()));
private static final Optional BUCKET_PROPERTY = Optional.of(
new HiveBucketProperty(ImmutableList.of("col1"), BUCKETING_V1, BUCKET_COUNT, ImmutableList.of()));
diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveApplyProjectionUtil.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveApplyProjectionUtil.java
new file mode 100644
index 000000000000..55fe7dfb83d1
--- /dev/null
+++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveApplyProjectionUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.hive;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.spi.expression.ConnectorExpression;
+import io.prestosql.spi.expression.Constant;
+import io.prestosql.spi.expression.FieldDereference;
+import io.prestosql.spi.expression.Variable;
+import org.testng.annotations.Test;
+
+import static io.prestosql.plugin.hive.HiveApplyProjectionUtil.extractSupportedProjectedColumns;
+import static io.prestosql.plugin.hive.HiveApplyProjectionUtil.isPushDownSupported;
+import static io.prestosql.spi.type.IntegerType.INTEGER;
+import static io.prestosql.spi.type.RowType.field;
+import static io.prestosql.spi.type.RowType.rowType;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+public class TestHiveApplyProjectionUtil
+{
+ private static final ConnectorExpression ROW_OF_ROW_VARIABLE = new Variable("a", rowType(field("b", rowType(field("c", INTEGER)))));
+
+ private static final ConnectorExpression ONE_LEVEL_DEREFERENCE = new FieldDereference(
+ rowType(field("c", INTEGER)),
+ ROW_OF_ROW_VARIABLE,
+ 0);
+
+ private static final ConnectorExpression TWO_LEVEL_DEREFERENCE = new FieldDereference(
+ INTEGER,
+ ONE_LEVEL_DEREFERENCE,
+ 0);
+
+ private static final ConnectorExpression INT_VARIABLE = new Variable("a", INTEGER);
+ private static final ConnectorExpression CONSTANT = new Constant(5, INTEGER);
+
+ @Test
+ public void testIsProjectionSupported()
+ {
+ assertTrue(isPushDownSupported(ONE_LEVEL_DEREFERENCE));
+ assertTrue(isPushDownSupported(TWO_LEVEL_DEREFERENCE));
+ assertTrue(isPushDownSupported(INT_VARIABLE));
+ assertFalse(isPushDownSupported(CONSTANT));
+ }
+
+ @Test
+ public void testExtractSupportedProjectionColumns()
+ {
+ assertEquals(extractSupportedProjectedColumns(ONE_LEVEL_DEREFERENCE), ImmutableList.of(ONE_LEVEL_DEREFERENCE));
+ assertEquals(extractSupportedProjectedColumns(TWO_LEVEL_DEREFERENCE), ImmutableList.of(TWO_LEVEL_DEREFERENCE));
+ assertEquals(extractSupportedProjectedColumns(INT_VARIABLE), ImmutableList.of(INT_VARIABLE));
+ assertEquals(extractSupportedProjectedColumns(CONSTANT), ImmutableList.of());
+ }
+}
diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveColumnHandle.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveColumnHandle.java
index 5ca36851644f..0084575d88f1 100644
--- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveColumnHandle.java
+++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveColumnHandle.java
@@ -13,19 +13,28 @@
*/
package io.prestosql.plugin.hive;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.json.JsonCodec;
import io.airlift.json.JsonCodecFactory;
import io.airlift.json.ObjectMapperProvider;
-import io.prestosql.spi.type.TestingTypeManager;
+import io.prestosql.spi.type.RowType;
import io.prestosql.spi.type.Type;
+import io.prestosql.type.InternalTypeManager;
import org.testng.annotations.Test;
import java.util.Optional;
+import static io.prestosql.metadata.MetadataManager.createTestMetadataManager;
import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.PARTITION_KEY;
import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.REGULAR;
+import static io.prestosql.plugin.hive.HiveColumnHandle.createBaseColumn;
+import static io.prestosql.plugin.hive.HiveType.toHiveType;
+import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.DoubleType.DOUBLE;
+import static io.prestosql.spi.type.RowType.field;
+import static io.prestosql.spi.type.VarcharType.VARCHAR;
+import static java.util.Arrays.asList;
import static org.testng.Assert.assertEquals;
public class TestHiveColumnHandle
@@ -40,29 +49,60 @@ public void testHiddenColumn()
@Test
public void testRegularColumn()
{
- HiveColumnHandle expectedPartitionColumn = new HiveColumnHandle("name", HiveType.HIVE_FLOAT, DOUBLE, 88, PARTITION_KEY, Optional.empty());
+ HiveColumnHandle expectedPartitionColumn = createBaseColumn("name", 88, HiveType.HIVE_FLOAT, DOUBLE, PARTITION_KEY, Optional.empty());
testRoundTrip(expectedPartitionColumn);
}
@Test
public void testPartitionKeyColumn()
{
- HiveColumnHandle expectedRegularColumn = new HiveColumnHandle("name", HiveType.HIVE_FLOAT, DOUBLE, 88, REGULAR, Optional.empty());
+ HiveColumnHandle expectedRegularColumn = createBaseColumn("name", 88, HiveType.HIVE_FLOAT, DOUBLE, REGULAR, Optional.empty());
testRoundTrip(expectedRegularColumn);
}
+ @Test
+ public void testProjectedColumn()
+ {
+ Type baseType = RowType.from(asList(field("a", VARCHAR), field("b", BIGINT)));
+ HiveType baseHiveType = toHiveType(new HiveTypeTranslator(), baseType);
+
+ HiveColumnProjectionInfo columnProjectionInfo = new HiveColumnProjectionInfo(
+ ImmutableList.of(1),
+ ImmutableList.of("b"),
+ HiveType.HIVE_LONG,
+ BIGINT);
+
+ HiveColumnHandle projectedColumn = new HiveColumnHandle(
+ "struct_col",
+ 88,
+ baseHiveType,
+ baseType,
+ Optional.of(columnProjectionInfo),
+ REGULAR,
+ Optional.empty());
+
+ testRoundTrip(projectedColumn);
+ }
+
private void testRoundTrip(HiveColumnHandle expected)
{
ObjectMapperProvider objectMapperProvider = new ObjectMapperProvider();
- objectMapperProvider.setJsonDeserializers(ImmutableMap.of(Type.class, new HiveModule.TypeDeserializer(new TestingTypeManager())));
+ objectMapperProvider.setJsonDeserializers(ImmutableMap.of(Type.class, new HiveModule.TypeDeserializer(new InternalTypeManager(createTestMetadataManager()))));
JsonCodec codec = new JsonCodecFactory(objectMapperProvider).jsonCodec(HiveColumnHandle.class);
String json = codec.toJson(expected);
HiveColumnHandle actual = codec.fromJson(json);
+ assertEquals(actual.getBaseColumnName(), expected.getBaseColumnName());
+ assertEquals(actual.getBaseHiveColumnIndex(), expected.getBaseHiveColumnIndex());
+ assertEquals(actual.getBaseType(), expected.getBaseType());
+ assertEquals(actual.getBaseHiveType(), expected.getBaseHiveType());
+
assertEquals(actual.getName(), expected.getName());
+ assertEquals(actual.getType(), expected.getType());
assertEquals(actual.getHiveType(), expected.getHiveType());
- assertEquals(actual.getHiveColumnIndex(), expected.getHiveColumnIndex());
+
+ assertEquals(actual.getHiveColumnProjectionInfo(), expected.getHiveColumnProjectionInfo());
assertEquals(actual.isPartitionKey(), expected.isPartitionKey());
}
}
diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveFileFormats.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveFileFormats.java
index f2892c16fb84..38fd8e801aa2 100644
--- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveFileFormats.java
+++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveFileFormats.java
@@ -34,6 +34,7 @@
import io.prestosql.spi.connector.RecordCursor;
import io.prestosql.spi.connector.RecordPageSource;
import io.prestosql.spi.predicate.TupleDomain;
+import io.prestosql.spi.type.Type;
import io.prestosql.testing.TestingConnectorSession;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.type.HiveVarchar;
@@ -55,14 +56,18 @@
import java.io.File;
import java.io.IOException;
import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
+import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.Slices.utf8Slice;
import static io.prestosql.plugin.hive.HiveStorageFormat.AVRO;
@@ -80,10 +85,12 @@
import static io.prestosql.plugin.hive.HiveTestUtils.createGenericHiveRecordCursorProvider;
import static io.prestosql.plugin.hive.HiveTestUtils.getHiveSession;
import static io.prestosql.plugin.hive.HiveTestUtils.getTypes;
+import static io.prestosql.testing.StructuralTestUtil.rowBlockOf;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT;
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB;
+import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardStructObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaStringObjectInspector;
import static org.testng.Assert.assertEquals;
@@ -523,6 +530,254 @@ public void testTruncateVarcharColumn()
.isReadableByRecordCursor(createGenericHiveRecordCursorProvider(HDFS_ENVIRONMENT));
}
+ @Test(dataProvider = "rowCount")
+ public void testAvroProjectedColumns(int rowCount)
+ throws Exception
+ {
+ List supportedColumns = getTestColumnsSupportedByAvro();
+ List regularColumns = getRegularColumns(supportedColumns);
+ List partitionColumns = getPartitionColumns(supportedColumns);
+
+ // Created projected columns for all regular supported columns
+ ImmutableList.Builder writeColumnsBuilder = ImmutableList.builder();
+ ImmutableList.Builder readeColumnsBuilder = ImmutableList.builder();
+ generateProjectedColumns(regularColumns, writeColumnsBuilder, readeColumnsBuilder);
+
+ List writeColumns = writeColumnsBuilder.addAll(partitionColumns).build();
+ List