-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Skip pre hash computation for join when input is table scan #20948
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |
| import com.facebook.presto.spi.plan.PlanNode; | ||
| import com.facebook.presto.spi.plan.PlanNodeIdAllocator; | ||
| import com.facebook.presto.spi.plan.ProjectNode; | ||
| import com.facebook.presto.spi.plan.TableScanNode; | ||
| import com.facebook.presto.spi.plan.UnionNode; | ||
| import com.facebook.presto.spi.relation.CallExpression; | ||
| import com.facebook.presto.spi.relation.RowExpression; | ||
|
|
@@ -65,6 +66,7 @@ | |
| import java.util.Set; | ||
| import java.util.function.Function; | ||
|
|
||
| import static com.facebook.presto.SystemSessionProperties.skipHashGenerationForJoinWithTableScanInput; | ||
| import static com.facebook.presto.common.type.BigintType.BIGINT; | ||
| import static com.facebook.presto.spi.plan.ProjectNode.Locality.LOCAL; | ||
| import static com.facebook.presto.spi.plan.ProjectNode.Locality.REMOTE; | ||
|
|
@@ -121,7 +123,7 @@ public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider | |
| requireNonNull(variableAllocator, "variableAllocator is null"); | ||
| requireNonNull(idAllocator, "idAllocator is null"); | ||
| if (isEnabled(session)) { | ||
| PlanWithProperties result = new Rewriter(idAllocator, variableAllocator, functionAndTypeManager).accept(plan, new HashComputationSet()); | ||
| PlanWithProperties result = new Rewriter(idAllocator, variableAllocator, functionAndTypeManager, session).accept(plan, new HashComputationSet()); | ||
| return PlanOptimizerResult.optimizerResult(result.getNode(), true); | ||
| } | ||
| return PlanOptimizerResult.optimizerResult(plan, false); | ||
|
|
@@ -133,12 +135,14 @@ private static class Rewriter | |
| private final PlanNodeIdAllocator idAllocator; | ||
| private final VariableAllocator variableAllocator; | ||
| private final FunctionAndTypeManager functionAndTypeManager; | ||
| private final Session session; | ||
|
|
||
| private Rewriter(PlanNodeIdAllocator idAllocator, VariableAllocator variableAllocator, FunctionAndTypeManager functionAndTypeManager) | ||
| private Rewriter(PlanNodeIdAllocator idAllocator, VariableAllocator variableAllocator, FunctionAndTypeManager functionAndTypeManager, Session session) | ||
| { | ||
| this.idAllocator = requireNonNull(idAllocator, "idAllocator is null"); | ||
| this.variableAllocator = requireNonNull(variableAllocator, "variableAllocator is null"); | ||
| this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionManager is null"); | ||
| this.session = requireNonNull(session, "session is null"); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -315,6 +319,11 @@ public PlanWithProperties visitTopNRowNumber(TopNRowNumberNode node, HashComputa | |
| child.getHashVariables()); | ||
| } | ||
|
|
||
| private boolean skipHashComputeForJoinInput(PlanNode node, Optional<HashComputation> hashComputation, HashComputationSet parentPreference) | ||
| { | ||
| return node instanceof TableScanNode && hashComputation.isPresent() && hashComputation.get().isSingleBigIntVariable() && !parentPreference.getHashes().contains(hashComputation.get()); | ||
|
||
| } | ||
|
|
||
| @Override | ||
| public PlanWithProperties visitJoin(JoinNode node, HashComputationSet parentPreference) | ||
| { | ||
|
|
@@ -333,13 +342,19 @@ public PlanWithProperties visitJoin(JoinNode node, HashComputationSet parentPref | |
| // join does not pass through preferred hash variables since they take more memory and since | ||
| // the join node filters, may take more compute | ||
| Optional<HashComputation> leftHashComputation = computeHash(Lists.transform(clauses, JoinNode.EquiJoinClause::getLeft), functionAndTypeManager); | ||
| if (skipHashGenerationForJoinWithTableScanInput(session) && skipHashComputeForJoinInput(node.getLeft(), leftHashComputation, parentPreference)) { | ||
| leftHashComputation = Optional.empty(); | ||
| } | ||
| PlanWithProperties left = planAndEnforce(node.getLeft(), new HashComputationSet(leftHashComputation), true, new HashComputationSet(leftHashComputation)); | ||
| VariableReferenceExpression leftHashVariable = left.getRequiredHashVariable(leftHashComputation.get()); | ||
| Optional<VariableReferenceExpression> leftHashVariable = leftHashComputation.isPresent() ? Optional.of(left.getRequiredHashVariable(leftHashComputation.get())) : Optional.empty(); | ||
|
|
||
| Optional<HashComputation> rightHashComputation = computeHash(Lists.transform(clauses, JoinNode.EquiJoinClause::getRight), functionAndTypeManager); | ||
| if (skipHashGenerationForJoinWithTableScanInput(session) && skipHashComputeForJoinInput(node.getRight(), rightHashComputation, parentPreference)) { | ||
| rightHashComputation = Optional.empty(); | ||
| } | ||
| // drop undesired hash variables from build to save memory | ||
| PlanWithProperties right = planAndEnforce(node.getRight(), new HashComputationSet(rightHashComputation), true, new HashComputationSet(rightHashComputation)); | ||
| VariableReferenceExpression rightHashVariable = right.getRequiredHashVariable(rightHashComputation.get()); | ||
| Optional<VariableReferenceExpression> rightHashVariable = rightHashComputation.isPresent() ? Optional.of(right.getRequiredHashVariable(rightHashComputation.get())) : Optional.empty(); | ||
|
|
||
| // build map of all hash variables | ||
| // NOTE: Full outer join doesn't use hash variables | ||
|
|
@@ -351,7 +366,7 @@ public PlanWithProperties visitJoin(JoinNode node, HashComputationSet parentPref | |
| allHashVariables.putAll(right.getHashVariables()); | ||
| } | ||
|
|
||
| return buildJoinNodeWithPreferredHashes(node, left, right, allHashVariables, parentPreference, Optional.of(leftHashVariable), Optional.of(rightHashVariable)); | ||
| return buildJoinNodeWithPreferredHashes(node, left, right, allHashVariables, parentPreference, leftHashVariable, rightHashVariable); | ||
| } | ||
|
|
||
| private PlanWithProperties buildJoinNodeWithPreferredHashes( | ||
|
|
@@ -929,6 +944,11 @@ public boolean canComputeWith(Set<VariableReferenceExpression> availableFields) | |
| return availableFields.containsAll(fields); | ||
| } | ||
|
|
||
| public boolean isSingleBigIntVariable() | ||
| { | ||
| return fields.size() == 1 && Iterables.getOnlyElement(fields).getType().equals(BIGINT); | ||
| } | ||
|
|
||
| private RowExpression getHashExpression() | ||
| { | ||
| RowExpression hashExpression = constant(INITIAL_HASH_VALUE, BIGINT); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| /* | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package com.facebook.presto.sql.planner.optimizations; | ||
|
|
||
| import com.facebook.presto.Session; | ||
| import com.facebook.presto.sql.planner.assertions.BasePlanTest; | ||
| import com.google.common.collect.ImmutableList; | ||
| import com.google.common.collect.ImmutableMap; | ||
| import org.testng.annotations.Test; | ||
|
|
||
| import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; | ||
| import static com.facebook.presto.SystemSessionProperties.JOIN_REORDERING_STRATEGY; | ||
| import static com.facebook.presto.SystemSessionProperties.SKIP_HASH_GENERATION_FOR_JOIN_WITH_TABLE_SCAN_INPUT; | ||
| import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.BROADCAST; | ||
| import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy.ELIMINATE_CROSS_JOINS; | ||
| import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; | ||
| import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.equiJoinClause; | ||
| import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.exchange; | ||
| import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.join; | ||
| import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.project; | ||
| import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan; | ||
| import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING; | ||
| import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPLICATE; | ||
| import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER; | ||
|
|
||
| public class TestHashGenerationOptimizer | ||
| extends BasePlanTest | ||
| { | ||
| @Test | ||
| public void testSkipHashGenerationForJoinWithTableScanInput() | ||
| { | ||
| Session enable = Session.builder(this.getQueryRunner().getDefaultSession()) | ||
| .setSystemProperty(JOIN_REORDERING_STRATEGY, ELIMINATE_CROSS_JOINS.name()) | ||
| .setSystemProperty(JOIN_DISTRIBUTION_TYPE, BROADCAST.name()) | ||
| .setSystemProperty(SKIP_HASH_GENERATION_FOR_JOIN_WITH_TABLE_SCAN_INPUT, "true") | ||
| .build(); | ||
| assertPlanWithSession("select * from lineitem l join orders o on l.partkey=o.custkey", | ||
| enable, | ||
| false, | ||
| anyTree( | ||
| join(INNER, ImmutableList.of(equiJoinClause("partkey", "custkey")), | ||
| tableScan("lineitem", ImmutableMap.of("partkey", "partkey")), | ||
| anyTree( | ||
| exchange(REMOTE_STREAMING, REPLICATE, | ||
| anyTree( | ||
| tableScan("orders", ImmutableMap.of("custkey", "custkey")))))))); | ||
|
|
||
| Session disable = Session.builder(this.getQueryRunner().getDefaultSession()) | ||
| .setSystemProperty(JOIN_REORDERING_STRATEGY, ELIMINATE_CROSS_JOINS.name()) | ||
| .setSystemProperty(JOIN_DISTRIBUTION_TYPE, BROADCAST.name()) | ||
| .setSystemProperty(SKIP_HASH_GENERATION_FOR_JOIN_WITH_TABLE_SCAN_INPUT, "false") | ||
| .build(); | ||
| assertPlanWithSession("select * from lineitem l join orders o on l.partkey=o.custkey", | ||
| disable, | ||
| false, | ||
| anyTree( | ||
| join(INNER, ImmutableList.of(equiJoinClause("partkey", "custkey")), | ||
| project( | ||
| tableScan("lineitem", ImmutableMap.of("partkey", "partkey"))), | ||
| anyTree( | ||
| exchange(REMOTE_STREAMING, REPLICATE, | ||
| anyTree( | ||
| tableScan("orders", ImmutableMap.of("custkey", "custkey")))))))); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious why we would ever add this hash computation when the parent does not require it. I.e. wouldn't this check simply always be
"return hashComputation.isPresent() && !parentPreference.getHashes().contains(hashComputation.get());"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This optimization is based on our observation, where TableScan below join is significantly than ScanProject (here project is for hash generation) for big int join key. Do not observe the same for other cases.