Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ public HiveParquetDereferencePushDown(HiveTransactionManager transactionManager,
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
}

private static Map<RowExpression, Subfield> extractDereferences(ConnectorSession session, ExpressionOptimizer expressionOptimizer, Set<RowExpression> expressions)
private static Map<RowExpression, Subfield> extractDereferences(
Map<String, HiveColumnHandle> regularHiveColumnHandles,
ConnectorSession session, ExpressionOptimizer expressionOptimizer,
Set<RowExpression> expressions)
{
Set<RowExpression> dereferenceAndVariableExpressions = new HashSet<>();
expressions.forEach(e -> e.accept(new ExtractDereferenceAndVariables(session, expressionOptimizer), dereferenceAndVariableExpressions));
Expand All @@ -90,7 +93,8 @@ private static Map<RowExpression, Subfield> extractDereferences(ConnectorSession
.filter(expression -> expression instanceof SpecialFormExpression && ((SpecialFormExpression) expression).getForm() == DEREFERENCE)
.collect(Collectors.toList());

return dereferences.stream().collect(toMap(identity(), dereference -> createNestedColumn(dereference, expressionOptimizer, session)));
return dereferences.stream().collect(toMap(identity(), dereference -> createNestedColumn(
regularHiveColumnHandles, dereference, expressionOptimizer, session)));
}

private static boolean prefixExists(RowExpression expression, Set<RowExpression> allExpressions)
Expand Down Expand Up @@ -128,7 +132,9 @@ public Void visitVariableReference(VariableReferenceExpression reference, int[]
return referenceCount[0] > 1;
}

private static Subfield createNestedColumn(RowExpression rowExpression, ExpressionOptimizer expressionOptimizer, ConnectorSession session)
private static Subfield createNestedColumn(Map<String, HiveColumnHandle> regularHiveColumnHandles,
RowExpression rowExpression, ExpressionOptimizer expressionOptimizer,
ConnectorSession session)
{
if (!(rowExpression instanceof SpecialFormExpression) || ((SpecialFormExpression) rowExpression).getForm() != DEREFERENCE) {
throw new IllegalArgumentException("expecting SpecialFormExpression(DEREFERENCE), but got: " + rowExpression);
Expand All @@ -138,7 +144,11 @@ private static Subfield createNestedColumn(RowExpression rowExpression, Expressi
while (true) {
if (rowExpression instanceof VariableReferenceExpression) {
Collections.reverse(elements);
return new Subfield(((VariableReferenceExpression) rowExpression).getName(), unmodifiableList(elements));
String name = ((VariableReferenceExpression) rowExpression).getName();
HiveColumnHandle handle = regularHiveColumnHandles.get(name);
checkArgument(handle != null, "Missing Hive column handle: " + name);
String originalColumnName = regularHiveColumnHandles.get(name).getName();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add check to make sure regularHiveColumnHandles.get(name) returns non-null?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a Preconditions check

return new Subfield(originalColumnName, unmodifiableList(elements));
}

if (rowExpression instanceof SpecialFormExpression && ((SpecialFormExpression) rowExpression).getForm() == DEREFERENCE) {
Expand Down Expand Up @@ -329,18 +339,22 @@ public PlanNode visitProject(ProjectNode project, Void context)
if (!isParquetDereferenceEnabled(session, tableScan.getTable())) {
return visitPlan(project, context);
}
Map<String, HiveColumnHandle> regularHiveColumnHandles = new HashMap<>();
regularHiveColumnHandles.putAll(tableScan.getAssignments().entrySet().stream()
.collect(toMap(e -> e.getKey().getName(), e -> (HiveColumnHandle) e.getValue())));
regularHiveColumnHandles.putAll(tableScan.getAssignments().values().stream()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this mapping (columnName in HiveColumnHandle -> HiveColumnHandle)? Expressions should be referring to the name in Assignments. Is that not the case in some cases?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry are you referring to the second putAll of the two here? If remove this mapping, I would get
nested column [X.Y]'s base column X is not present in table scan output
error in the immediate following logic. This places uses the subfield name to look up back to HiveColumnHandle, and now subfile has the same name as the original HiveColumnHandle, so I used the same map to do this lookup.

.map(columnHandle -> (HiveColumnHandle) columnHandle)
.collect(toMap(HiveColumnHandle::getName, identity())));

Map<RowExpression, Subfield> dereferenceToNestedColumnMap = extractDereferences(
regularHiveColumnHandles,
session,
rowExpressionService.getExpressionOptimizer(),
new HashSet<>(project.getAssignments().getExpressions()));
if (dereferenceToNestedColumnMap.isEmpty()) {
return visitPlan(project, context);
}

Map<String, HiveColumnHandle> regularHiveColumnHandles = tableScan.getAssignments().entrySet().stream()
.collect(toMap(e -> e.getKey().getName(), e -> (HiveColumnHandle) e.getValue()));

List<VariableReferenceExpression> newOutputVariables = new ArrayList<>(tableScan.getOutputVariables());
Map<VariableReferenceExpression, ColumnHandle> newAssignments = new HashMap<>(tableScan.getAssignments());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1723,7 +1723,7 @@ public void testParquetDereferencePushDown()
anyTree(
node(JoinNode.class,
anyTree(tableScanParquetDeferencePushDowns("test_pushdown_nestedcolumn_parquet", nestedColumnMap("x.a"))),
anyTree(tableScanParquetDeferencePushDowns("test_pushdown_nestedcolumn_parquet", nestedColumnMap("x_1.a"))))));
anyTree(tableScanParquetDeferencePushDowns("test_pushdown_nestedcolumn_parquet", nestedColumnMap("x.a"))))));

// Aggregation
assertParquetDereferencePushDown("SELECT id, min(x.a) FROM test_pushdown_nestedcolumn_parquet GROUP BY 1",
Expand Down