Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ public void testPartitionStatisticsSampling()
// Alluxio metastore does not support create operations
}

@Override
public void testApplyProjection()
{
// Alluxio metastore does not support create/delete operations
}

@Override
public void testPreferredInsertLayout()
{
Expand Down
7 changes: 7 additions & 0 deletions presto-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.prestosql</groupId>
<artifactId>presto-main</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.prestosql</groupId>
<artifactId>presto-main</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static io.prestosql.plugin.hive.ReaderProjections.projectBaseColumns;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

Expand All @@ -59,7 +60,7 @@ public GenericHiveRecordCursorProvider(HdfsEnvironment hdfsEnvironment, DataSize
}

@Override
public Optional<RecordCursor> createRecordCursor(
public Optional<ReaderRecordCursorWithProjections> createRecordCursor(
Configuration configuration,
ConnectorSession session,
Path path,
Expand All @@ -83,18 +84,32 @@ public Optional<RecordCursor> createRecordCursor(
throw new PrestoException(HIVE_FILESYSTEM_ERROR, "Failed getting FileSystem: " + path, e);
}

return hdfsEnvironment.doAs(session.getUser(), () -> {
RecordReader<?, ?> recordReader = HiveUtil.createRecordReader(configuration, path, start, length, schema, columns);
Optional<ReaderProjections> projectedReaderColumns = projectBaseColumns(columns);

return Optional.of(new GenericHiveRecordCursor<>(
RecordCursor cursor = hdfsEnvironment.doAs(session.getUser(), () -> {
RecordReader<?, ?> recordReader = HiveUtil.createRecordReader(
configuration,
path,
start,
length,
schema,
projectedReaderColumns
.map(ReaderProjections::getReaderColumns)
.orElse(columns));

return new GenericHiveRecordCursor<>(
configuration,
path,
genericRecordReader(recordReader),
length,
schema,
columns,
hiveStorageTimeZone));
projectedReaderColumns
.map(ReaderProjections::getReaderColumns)
.orElse(columns),
hiveStorageTimeZone);
});

return Optional.of(new ReaderRecordCursorWithProjections(cursor, projectedReaderColumns));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.hive;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.expression.ConnectorExpression;
import io.prestosql.spi.expression.FieldDereference;
import io.prestosql.spi.expression.Variable;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

final class HiveApplyProjectionUtil
{
private HiveApplyProjectionUtil() {}

public static List<ConnectorExpression> extractSupportedProjectedColumns(ConnectorExpression expression)
{
requireNonNull(expression, "expression is null");
ImmutableList.Builder<ConnectorExpression> supportedSubExpressions = ImmutableList.builder();
fillSupportedProjectedColumns(expression, supportedSubExpressions);
return supportedSubExpressions.build();
}

private static void fillSupportedProjectedColumns(ConnectorExpression expression, ImmutableList.Builder<ConnectorExpression> supportedSubExpressions)
{
if (isPushDownSupported(expression)) {
supportedSubExpressions.add(expression);
return;
}

// If the whole expression is not supported, look for a partially supported projection
if (expression instanceof FieldDereference) {
fillSupportedProjectedColumns(((FieldDereference) expression).getTarget(), supportedSubExpressions);
}
}

@VisibleForTesting
static boolean isPushDownSupported(ConnectorExpression expression)
{
return expression instanceof Variable ||
(expression instanceof FieldDereference && isPushDownSupported(((FieldDereference) expression).getTarget()));
}

public static ProjectedColumnRepresentation createProjectedColumnRepresentation(ConnectorExpression expression)
{
ImmutableList.Builder<Integer> ordinals = ImmutableList.builder();

Variable target;
while (true) {
if (expression instanceof Variable) {
target = (Variable) expression;
break;
}
else if (expression instanceof FieldDereference) {
FieldDereference dereference = (FieldDereference) expression;
ordinals.add(dereference.getField());
expression = dereference.getTarget();
}
else {
throw new IllegalArgumentException("expression is not a valid dereference chain");
}
}

return new ProjectedColumnRepresentation(target, ordinals.build().reverse());
}

/**
* Replace all connector expressions with variables as given by {@param expressionToVariableMappings} in a top down manner.
* i.e. if the replacement occurs for the parent, the children will not be visited.
*/
public static ConnectorExpression replaceWithNewVariables(ConnectorExpression expression, Map<ConnectorExpression, Variable> expressionToVariableMappings)
{
if (expressionToVariableMappings.containsKey(expression)) {
return expressionToVariableMappings.get(expression);
}

if (expression instanceof FieldDereference) {
ConnectorExpression newTarget = replaceWithNewVariables(((FieldDereference) expression).getTarget(), expressionToVariableMappings);
return new FieldDereference(expression.getType(), newTarget, ((FieldDereference) expression).getField());
}

return expression;
}

/**
* Returns the assignment key corresponding to the column represented by {@param projectedColumn} in the {@param assignments}, if one exists.
* The variable in the {@param projectedColumn} can itself be a representation of another projected column. For example,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this comment reflect new behavior? If not, move it to the commit that introduced the method.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* say a projected column representation has variable "x" and a dereferenceIndices=[0]. "x" can in-turn map to a projected
* column handle with base="a" and [1, 2] as dereference indices. Then the method searches for a column handle in
* {@param assignments} with base="a" and dereferenceIndices=[1, 2, 0].
*/
public static Optional<String> find(Map<String, ColumnHandle> assignments, ProjectedColumnRepresentation projectedColumn)
{
HiveColumnHandle variableColumn = (HiveColumnHandle) assignments.get(projectedColumn.getVariable().getName());

if (variableColumn == null) {
return Optional.empty();
}

String baseColumnName = variableColumn.getBaseColumnName();

List<Integer> variableColumnIndices = variableColumn.getHiveColumnProjectionInfo()
.map(HiveColumnProjectionInfo::getDereferenceIndices)
.orElse(ImmutableList.of());

List<Integer> projectionIndices = ImmutableList.<Integer>builder()
.addAll(variableColumnIndices)
.addAll(projectedColumn.getDereferenceIndices())
.build();

for (Map.Entry<String, ColumnHandle> entry : assignments.entrySet()) {
HiveColumnHandle column = (HiveColumnHandle) entry.getValue();
if (column.getBaseColumnName().equals(baseColumnName) &&
column.getHiveColumnProjectionInfo()
.map(HiveColumnProjectionInfo::getDereferenceIndices)
.orElse(ImmutableList.of())
.equals(projectionIndices)) {
return Optional.of(entry.getKey());
}
}

return Optional.empty();
}

public static class ProjectedColumnRepresentation
{
private final Variable variable;
private final List<Integer> dereferenceIndices;

public ProjectedColumnRepresentation(Variable variable, List<Integer> dereferenceIndices)
{
this.variable = requireNonNull(variable, "variable is null");
this.dereferenceIndices = ImmutableList.copyOf(requireNonNull(dereferenceIndices, "dereferenceIndices is null"));
}

public Variable getVariable()
{
return variable;
}

public List<Integer> getDereferenceIndices()
{
return dereferenceIndices;
}

public boolean isVariable()
{
return dereferenceIndices.isEmpty();
}

@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if ((obj == null) || (getClass() != obj.getClass())) {
return false;
}
ProjectedColumnRepresentation that = (ProjectedColumnRepresentation) obj;
return Objects.equals(variable, that.variable) &&
Objects.equals(dereferenceIndices, that.dereferenceIndices);
}

@Override
public int hashCode()
{
return Objects.hash(variable, dereferenceIndices);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

Expand All @@ -41,6 +43,7 @@ public HiveBucketHandle(
@JsonProperty("readBucketCount") int readBucketCount)
{
this.columns = requireNonNull(columns, "columns is null");
columns.forEach(column -> checkArgument(column.isBaseColumn(), format("projected column %s is not allowed for bucketing", column)));
this.bucketingVersion = requireNonNull(bucketingVersion, "bucketingVersion is null");
this.tableBucketCount = tableBucketCount;
this.readBucketCount = readBucketCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public HiveCoercionRecordCursor(
for (int columnIndex = 0; columnIndex < size; columnIndex++) {
ColumnMapping columnMapping = columnMappings.get(columnIndex);

if (columnMapping.getCoercionFrom().isPresent()) {
coercers[columnIndex] = createCoercer(typeManager, columnMapping.getCoercionFrom().get(), columnMapping.getHiveColumnHandle().getHiveType(), bridgingRecordCursor);
if (columnMapping.getBaseTypeCoercionFrom().isPresent()) {
coercers[columnIndex] = createCoercer(typeManager, columnMapping.getBaseTypeCoercionFrom().get(), columnMapping.getHiveColumnHandle().getHiveType(), bridgingRecordCursor);
}
}
}
Expand Down
Loading