Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinNotNullInferenceStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.LeftJoinArrayContainsToInnerJoinStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.PartialAggregationStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.PartialMergePushdownStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.PartitioningPrecisionStrategy;
Expand Down Expand Up @@ -287,6 +288,7 @@ public final class SystemSessionProperties
public static final String REWRITE_CROSS_JOIN_OR_TO_INNER_JOIN = "rewrite_cross_join_or_to_inner_join";
public static final String REWRITE_CROSS_JOIN_ARRAY_CONTAINS_TO_INNER_JOIN = "rewrite_cross_join_array_contains_to_inner_join";
public static final String REWRITE_CROSS_JOIN_ARRAY_NOT_CONTAINS_TO_ANTI_JOIN = "rewrite_cross_join_array_not_contains_to_anti_join";
public static final String REWRITE_LEFT_JOIN_ARRAY_CONTAINS_TO_EQUI_JOIN = "rewrite_left_join_array_contains_to_equi_join";
public static final String REWRITE_LEFT_JOIN_NULL_FILTER_TO_SEMI_JOIN = "rewrite_left_join_null_filter_to_semi_join";
public static final String USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN = "use_broadcast_when_buildsize_small_probeside_unknown";
public static final String ADD_PARTIAL_NODE_FOR_ROW_NUMBER_WITH_LIMIT = "add_partial_node_for_row_number_with_limit";
Expand Down Expand Up @@ -1734,6 +1736,18 @@ public SystemSessionProperties(
"Rewrite cross join with array not contains filter to anti join",
featuresConfig.isRewriteCrossJoinWithArrayNotContainsFilterToAntiJoin(),
false),
new PropertyMetadata<>(
REWRITE_LEFT_JOIN_ARRAY_CONTAINS_TO_EQUI_JOIN,
format("Set the strategy used to convert left join with array contains to inner join. Options are: %s",
Stream.of(LeftJoinArrayContainsToInnerJoinStrategy.values())
.map(LeftJoinArrayContainsToInnerJoinStrategy::name)
.collect(joining(","))),
VARCHAR,
LeftJoinArrayContainsToInnerJoinStrategy.class,
featuresConfig.getLeftJoinWithArrayContainsToEquiJoinStrategy(),
false,
value -> LeftJoinArrayContainsToInnerJoinStrategy.valueOf(((String) value).toUpperCase()),
LeftJoinArrayContainsToInnerJoinStrategy::name),
new PropertyMetadata<>(
JOINS_NOT_NULL_INFERENCE_STRATEGY,
format("Set the strategy used NOT NULL filter inference on Join Nodes. Options are: %s",
Expand Down Expand Up @@ -2955,6 +2969,11 @@ public static boolean isRewriteCrossJoinArrayNotContainsToAntiJoinEnabled(Sessio
return session.getSystemProperty(REWRITE_CROSS_JOIN_ARRAY_NOT_CONTAINS_TO_ANTI_JOIN, Boolean.class);
}

public static LeftJoinArrayContainsToInnerJoinStrategy getLeftJoinArrayContainsToInnerJoinStrategy(Session session)
{
return session.getSystemProperty(REWRITE_LEFT_JOIN_ARRAY_CONTAINS_TO_EQUI_JOIN, LeftJoinArrayContainsToInnerJoinStrategy.class);
}

public static boolean isRewriteLeftJoinNullFilterToSemiJoinEnabled(Session session)
{
return session.getSystemProperty(REWRITE_LEFT_JOIN_NULL_FILTER_TO_SEMI_JOIN, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ public class FeaturesConfig
private PushDownFilterThroughCrossJoinStrategy pushDownFilterExpressionEvaluationThroughCrossJoin = PushDownFilterThroughCrossJoinStrategy.REWRITTEN_TO_INNER_JOIN;
private boolean rewriteCrossJoinWithOrFilterToInnerJoin = true;
private boolean rewriteCrossJoinWithArrayContainsFilterToInnerJoin = true;
private LeftJoinArrayContainsToInnerJoinStrategy leftJoinWithArrayContainsToEquiJoinStrategy = LeftJoinArrayContainsToInnerJoinStrategy.DISABLED;
private boolean rewriteCrossJoinWithArrayNotContainsFilterToAntiJoin = true;
private JoinNotNullInferenceStrategy joinNotNullInferenceStrategy = NONE;
private boolean leftJoinNullFilterToSemiJoin = true;
Expand Down Expand Up @@ -424,6 +425,13 @@ public enum JoinNotNullInferenceStrategy
USE_FUNCTION_METADATA
}

// TODO: Implement cost based strategy
public enum LeftJoinArrayContainsToInnerJoinStrategy
{
DISABLED,
ALWAYS_ENABLED
}

public double getCpuCostWeight()
{
return cpuCostWeight;
Expand Down Expand Up @@ -2746,6 +2754,19 @@ public FeaturesConfig setRewriteCrossJoinWithArrayContainsFilterToInnerJoin(bool
return this;
}

public LeftJoinArrayContainsToInnerJoinStrategy getLeftJoinWithArrayContainsToEquiJoinStrategy()
{
return leftJoinWithArrayContainsToEquiJoinStrategy;
}

@Config("optimizer.left-join-with-array-contains-to-equi-join-strategy")
@ConfigDescription("When to apply rewrite left join with array contains to equi join")
public FeaturesConfig setLeftJoinWithArrayContainsToEquiJoinStrategy(LeftJoinArrayContainsToInnerJoinStrategy leftJoinWithArrayContainsToEquiJoinStrategy)
{
this.leftJoinWithArrayContainsToEquiJoinStrategy = leftJoinWithArrayContainsToEquiJoinStrategy;
return this;
}

public boolean isRewriteCrossJoinWithArrayNotContainsFilterToAntiJoin()
{
return this.rewriteCrossJoinWithArrayNotContainsFilterToAntiJoin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.facebook.presto.sql.planner.iterative.rule.InlineProjections;
import com.facebook.presto.sql.planner.iterative.rule.InlineSqlFunctions;
import com.facebook.presto.sql.planner.iterative.rule.LeftJoinNullFilterToSemiJoin;
import com.facebook.presto.sql.planner.iterative.rule.LeftJoinWithArrayContainsToEquiJoinCondition;
import com.facebook.presto.sql.planner.iterative.rule.MergeDuplicateAggregation;
import com.facebook.presto.sql.planner.iterative.rule.MergeFilters;
import com.facebook.presto.sql.planner.iterative.rule.MergeLimitWithDistinct;
Expand Down Expand Up @@ -522,6 +523,13 @@ public PlanOptimizers(
new CrossJoinWithOrFilterToInnerJoin(metadata.getFunctionAndTypeManager()),
new CrossJoinWithArrayContainsToInnerJoin(metadata.getFunctionAndTypeManager()),
new CrossJoinWithArrayNotContainsToAntiJoin(metadata, metadata.getFunctionAndTypeManager()))),
new IterativeOptimizer(
metadata,
ruleStats,
statsCalculator,
estimatedExchangesCostCalculator,
ImmutableSet.of(
new LeftJoinWithArrayContainsToEquiJoinCondition(metadata.getFunctionAndTypeManager()))),
new IterativeOptimizer(
metadata,
ruleStats,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.planner.iterative.rule;

import com.facebook.presto.Session;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.relation.CallExpression;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.analyzer.FeaturesConfig.LeftJoinArrayContainsToInnerJoinStrategy;
import com.facebook.presto.sql.planner.PlannerUtils;
import com.facebook.presto.sql.planner.iterative.Rule;
import com.facebook.presto.sql.planner.plan.JoinNode;
import com.facebook.presto.sql.planner.plan.UnnestNode;
import com.facebook.presto.sql.relational.FunctionResolution;
import com.facebook.presto.sql.relational.RowExpressionDeterminismEvaluator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import java.util.List;
import java.util.Optional;

import static com.facebook.presto.SystemSessionProperties.getLeftJoinArrayContainsToInnerJoinStrategy;
import static com.facebook.presto.expressions.LogicalRowExpressions.and;
import static com.facebook.presto.expressions.LogicalRowExpressions.extractConjuncts;
import static com.facebook.presto.sql.planner.VariablesExtractor.extractAll;
import static com.facebook.presto.sql.planner.plan.Patterns.join;
import static com.facebook.presto.sql.relational.Expressions.call;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

/**
* When the join condition of a left join has pattern of contains(array, element) where array, we can rewrite it as a equi join condition. For example:
* <pre>
* - Left Join
* empty join clause
* filter: contains(r_array, l_key)
* - scan l
* - scan r
* </pre>
* into:
* <pre>
* - Left Join
* l_key = field
* - scan l
* - Unnest
* field <- unnest distinct_array
* - project
* distinct_array := remove_nulls(array_distinct(r_array))
* - scan r
* r_array
* </pre>
*/
public class LeftJoinWithArrayContainsToEquiJoinCondition
implements Rule<JoinNode>
{
private static final Pattern<JoinNode> PATTERN = join().matching(x -> x.getType().equals(JoinNode.Type.LEFT) && x.getCriteria().isEmpty() && x.getFilter().isPresent());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the join criteria need to be empty? if we have a query like this

r left join s on (a=b and contains(...))

wouldn't we benefit from applying the same transformation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this case, the two tables will be partitioned on a and b and is already an equi-join. The benefit of apply this transformation will be unknown.

private final FunctionAndTypeManager functionAndTypeManager;
private final RowExpressionDeterminismEvaluator determinismEvaluator;
private final FunctionResolution functionResolution;

public LeftJoinWithArrayContainsToEquiJoinCondition(FunctionAndTypeManager functionAndTypeManager)
{
this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionAndTypeManager is null");
this.determinismEvaluator = new RowExpressionDeterminismEvaluator(functionAndTypeManager);
this.functionResolution = new FunctionResolution(functionAndTypeManager.getFunctionAndTypeResolver());
}

@Override
public Pattern<JoinNode> getPattern()
{
return PATTERN;
}

@Override
public boolean isEnabled(Session session)
{
// TODO: implement cost based with HBO
return getLeftJoinArrayContainsToInnerJoinStrategy(session).equals(LeftJoinArrayContainsToInnerJoinStrategy.ALWAYS_ENABLED);
}

@Override
public Result apply(JoinNode node, Captures captures, Context context)
{
RowExpression filterPredicate = node.getFilter().get();
List<VariableReferenceExpression> leftInput = node.getLeft().getOutputVariables();
List<VariableReferenceExpression> rightInput = node.getRight().getOutputVariables();
List<RowExpression> andConjuncts = extractConjuncts(filterPredicate);
Optional<RowExpression> arrayContains = andConjuncts.stream().filter(rowExpression -> isSupportedJoinCondition(rowExpression, leftInput, rightInput)).findFirst();
if (!arrayContains.isPresent()) {
return Result.empty();
}
List<RowExpression> remainingConjuncts = andConjuncts.stream().filter(rowExpression -> !rowExpression.equals(arrayContains.get())).collect(toImmutableList());
RowExpression array = ((CallExpression) arrayContains.get()).getArguments().get(0);
RowExpression element = ((CallExpression) arrayContains.get()).getArguments().get(1);
checkState(array.getType() instanceof ArrayType && ((ArrayType) array.getType()).getElementType().equals(element.getType()));

PlanNode newLeft = node.getLeft();
ImmutableMap.Builder<VariableReferenceExpression, RowExpression> leftAssignment = ImmutableMap.builder();
VariableReferenceExpression elementVariable;
if (!(element instanceof VariableReferenceExpression)) {
elementVariable = context.getVariableAllocator().newVariable(element);
leftAssignment.put(elementVariable, element);
newLeft = PlannerUtils.addProjections(node.getLeft(), context.getIdAllocator(), leftAssignment.build());
}
else {
elementVariable = (VariableReferenceExpression) element;
}

CallExpression arrayDistinct = call(functionAndTypeManager, "array_distinct", new ArrayType(element.getType()), array);
CallExpression arrayFilterNull = call(functionAndTypeManager, "remove_nulls", arrayDistinct.getType(), ImmutableList.of(arrayDistinct));
VariableReferenceExpression arrayFilterNullVariable = context.getVariableAllocator().newVariable(arrayFilterNull);
PlanNode newRight = PlannerUtils.addProjections(node.getRight(), context.getIdAllocator(), ImmutableMap.of(arrayFilterNullVariable, arrayFilterNull));
VariableReferenceExpression unnestVariable = context.getVariableAllocator().newVariable("unnest", element.getType());

UnnestNode unnestNode = new UnnestNode(newRight.getSourceLocation(),
context.getIdAllocator().getNextId(),
newRight,
newRight.getOutputVariables(),
ImmutableMap.of(arrayFilterNullVariable, ImmutableList.of(unnestVariable)),
Optional.empty());

JoinNode.EquiJoinClause equiJoinClause = new JoinNode.EquiJoinClause(elementVariable, unnestVariable);

return Result.ofPlanNode(new JoinNode(node.getSourceLocation(),
context.getIdAllocator().getNextId(),
node.getType(),
newLeft,
unnestNode,
ImmutableList.of(equiJoinClause),
node.getOutputVariables(),
remainingConjuncts.isEmpty() ? Optional.empty() : Optional.of(and(remainingConjuncts)),
Optional.empty(),
Optional.empty(),
node.getDistributionType(),
node.getDynamicFilters()));
}

private boolean isSupportedJoinCondition(RowExpression rowExpression, List<VariableReferenceExpression> leftInput, List<VariableReferenceExpression> rightInput)
{
if (rowExpression instanceof CallExpression && functionResolution.isArrayContainsFunction(((CallExpression) rowExpression).getFunctionHandle())) {
RowExpression arrayExpression = ((CallExpression) rowExpression).getArguments().get(0);
RowExpression elementExpression = ((CallExpression) rowExpression).getArguments().get(1);
return determinismEvaluator.isDeterministic(arrayExpression) && rightInput.containsAll(extractAll(arrayExpression))
&& determinismEvaluator.isDeterministic(elementExpression) && leftInput.containsAll(extractAll(elementExpression));
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationIfToFilterRewriteStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.LeftJoinArrayContainsToInnerJoinStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.PartialAggregationStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.PartitioningPrecisionStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.PushDownFilterThroughCrossJoinStrategy;
Expand Down Expand Up @@ -244,6 +245,7 @@ public void testDefaults()
.setDefaultJoinSelectivityCoefficient(0)
.setRewriteCrossJoinWithOrFilterToInnerJoin(true)
.setRewriteCrossJoinWithArrayContainsFilterToInnerJoin(true)
.setLeftJoinWithArrayContainsToEquiJoinStrategy(LeftJoinArrayContainsToInnerJoinStrategy.DISABLED)
.setRewriteCrossJoinWithArrayNotContainsFilterToAntiJoin(true)
.setLeftJoinNullFilterToSemiJoin(true)
.setBroadcastJoinWithSmallBuildUnknownProbe(false)
Expand Down Expand Up @@ -444,6 +446,7 @@ public void testExplicitPropertyMappings()
.put("optimizer.push-down-filter-expression-evaluation-through-cross-join", "DISABLED")
.put("optimizer.rewrite-cross-join-with-or-filter-to-inner-join", "false")
.put("optimizer.rewrite-cross-join-with-array-contains-filter-to-inner-join", "false")
.put("optimizer.left-join-with-array-contains-to-equi-join-strategy", "ALWAYS_ENABLED")
.put("optimizer.rewrite-cross-join-with-array-not-contains-filter-to-anti-join", "false")
.put("optimizer.default-join-selectivity-coefficient", "0.5")
.put("optimizer.rewrite-left-join-with-null-filter-to-semi-join", "false")
Expand Down Expand Up @@ -644,6 +647,7 @@ public void testExplicitPropertyMappings()
.setPushDownFilterExpressionEvaluationThroughCrossJoin(PushDownFilterThroughCrossJoinStrategy.DISABLED)
.setRewriteCrossJoinWithOrFilterToInnerJoin(false)
.setRewriteCrossJoinWithArrayContainsFilterToInnerJoin(false)
.setLeftJoinWithArrayContainsToEquiJoinStrategy(LeftJoinArrayContainsToInnerJoinStrategy.ALWAYS_ENABLED)
.setRewriteCrossJoinWithArrayNotContainsFilterToAntiJoin(false)
.setLeftJoinNullFilterToSemiJoin(false)
.setBroadcastJoinWithSmallBuildUnknownProbe(true)
Expand Down
Loading