Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,10 @@ private static Optional<PlanNodeStatsEstimate> expressionToExpressionEquality(
StatisticRange intersect = leftRange.intersect(rightRange);

double nullsFilterFactor = (1 - leftStats.getNullsFraction()) * (1 - rightStats.getNullsFraction());
double leftFilterFactor = firstNonNaN(leftRange.overlapPercentWith(intersect), 1);
double rightFilterFactor = firstNonNaN(rightRange.overlapPercentWith(intersect), 1);
double leftNdvInRange = leftFilterFactor * leftRange.getDistinctValuesCount();
double rightNdvInRange = rightFilterFactor * rightRange.getDistinctValuesCount();
double filterFactor = 1 * leftFilterFactor * rightFilterFactor / max(leftNdvInRange, rightNdvInRange, 1);
double retainedNdv = min(leftNdvInRange, rightNdvInRange);
double leftNdv = leftRange.getDistinctValuesCount();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this change more. why is the new way better?

double rightNdv = rightRange.getDistinctValuesCount();
double filterFactor = 1.0 / max(leftNdv, rightNdv, 1);
double retainedNdv = min(leftNdv, rightNdv);

PlanNodeStatsEstimate.Builder estimate = PlanNodeStatsEstimate.buildFrom(inputStatistics)
.setOutputRowCount(inputStatistics.getOutputRowCount() * nullsFilterFactor * filterFactor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.analyzer.ExpressionAnalyzer;
import com.facebook.presto.sql.analyzer.Scope;
import com.facebook.presto.sql.planner.ExpressionInterpreter;
import com.facebook.presto.sql.planner.LiteralEncoder;
import com.facebook.presto.sql.planner.LiteralInterpreter;
import com.facebook.presto.sql.planner.NoOpSymbolResolver;
import com.facebook.presto.sql.planner.Symbol;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.tree.AstVisitor;
Expand All @@ -33,12 +36,14 @@
import com.facebook.presto.sql.tree.Literal;
import com.facebook.presto.sql.tree.LogicalBinaryExpression;
import com.facebook.presto.sql.tree.Node;
import com.facebook.presto.sql.tree.NodeRef;
import com.facebook.presto.sql.tree.NotExpression;
import com.facebook.presto.sql.tree.SymbolReference;
import com.google.common.collect.ImmutableList;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalDouble;
Expand All @@ -51,6 +56,7 @@
import static com.facebook.presto.cost.StatsUtil.toStatsRepresentation;
import static com.facebook.presto.cost.SymbolStatsEstimate.UNKNOWN_STATS;
import static com.facebook.presto.cost.SymbolStatsEstimate.ZERO_STATS;
import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
import static com.facebook.presto.sql.ExpressionUtils.and;
import static com.facebook.presto.sql.tree.ComparisonExpression.Operator.EQUAL;
import static com.facebook.presto.sql.tree.ComparisonExpression.Operator.GREATER_THAN_OR_EQUAL;
Expand All @@ -61,6 +67,7 @@
import static java.lang.Double.isNaN;
import static java.lang.Double.min;
import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;

public class FilterStatsCalculator
Expand All @@ -70,12 +77,14 @@ public class FilterStatsCalculator
private final Metadata metadata;
private final ScalarStatsCalculator scalarStatsCalculator;
private final StatsNormalizer normalizer;
private final LiteralEncoder literalEncoder;

public FilterStatsCalculator(Metadata metadata, ScalarStatsCalculator scalarStatsCalculator, StatsNormalizer normalizer)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.scalarStatsCalculator = requireNonNull(scalarStatsCalculator, "scalarStatsCalculator is null");
this.normalizer = requireNonNull(normalizer, "normalizer is null");
this.literalEncoder = new LiteralEncoder(metadata.getBlockEncodingSerde());
}

public PlanNodeStatsEstimate filterStats(
Expand All @@ -84,10 +93,41 @@ public PlanNodeStatsEstimate filterStats(
Session session,
TypeProvider types)
{
return new FilterExpressionStatsCalculatingVisitor(statsEstimate, session, types).process(predicate)
Expression simplifiedExpression = simplifyExpression(session, predicate, types);
return new FilterExpressionStatsCalculatingVisitor(statsEstimate, session, types)
.process(simplifiedExpression)
.orElseGet(() -> normalizer.normalize(filterStatsForUnknownExpression(statsEstimate), types));
}

private Expression simplifyExpression(Session session, Expression predicate, TypeProvider types)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a static method in the SimplifyExpressions and reuse

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot do this, because i didn't remove the if (value == null) logic from this method. -- see my answer to https://github.com/prestodb/presto/pull/11267/files#r210729395

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can either have the isTopLevelPredicate flag as a parameter, or at least extract most of the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i did this and it worked great except we lost ability to test NOT (com.facebook.presto.cost.TestFilterStatsCalculator#testNotStats) because com.facebook.presto.sql.planner.iterative.rule.SimplifyExpressions#rewrite pushes NOT into the comparisons.
This reuse is really a good idea, I like it. For now i added a TODO note, since I can't sacrifice the testability and don't want to block PR this longer.

{
// TODO reuse com.facebook.presto.sql.planner.iterative.rule.SimplifyExpressions.rewrite

Map<NodeRef<Expression>, Type> expressionTypes = getExpressionTypes(session, predicate, types);
ExpressionInterpreter interpreter = ExpressionInterpreter.expressionOptimizer(predicate, metadata, session, expressionTypes);
Object value = interpreter.optimize(NoOpSymbolResolver.INSTANCE);

if (value == null) {
// Expression evaluates to SQL null, which in Filter is equivalent to false. This assumes the expression is a top-level expression (eg. not in NOT).
value = false;
}
return literalEncoder.toExpression(value, BOOLEAN);
}

private Map<NodeRef<Expression>, Type> getExpressionTypes(Session session, Expression expression, TypeProvider types)
{
ExpressionAnalyzer expressionAnalyzer = ExpressionAnalyzer.createWithoutSubqueries(
metadata.getFunctionRegistry(),
metadata.getTypeManager(),
session,
types,
emptyList(),
node -> new IllegalStateException("Expected node: %s" + node),
false);
expressionAnalyzer.analyze(expression, Scope.create());
return expressionAnalyzer.getExpressionTypes();
}

private static PlanNodeStatsEstimate filterStatsForUnknownExpression(PlanNodeStatsEstimate inputStatistics)
{
return inputStatistics.mapOutputRowCount(rowCount -> rowCount * UNKNOWN_FILTER_COEFFICIENT);
Expand Down Expand Up @@ -229,10 +269,10 @@ protected Optional<PlanNodeStatsEstimate> visitBetweenPredicate(BetweenPredicate
if (!(node.getValue() instanceof SymbolReference)) {
return visitExpression(node, context);
}
if (!(node.getMin() instanceof Literal || isSingleValue(getExpressionStats(node.getMin())))) {
if (!isSingleValue(node.getMin())) {
return visitExpression(node, context);
}
if (!(node.getMax() instanceof Literal || isSingleValue(getExpressionStats(node.getMax())))) {
if (!isSingleValue(node.getMax())) {
return visitExpression(node, context);
}

Expand Down Expand Up @@ -351,6 +391,12 @@ private Optional<Symbol> asSymbol(Expression expression)
return Optional.empty();
}

private boolean isSingleValue(Expression expression)
{
return (expression instanceof Literal)
|| isSingleValue(getExpressionStats(expression));
}

private boolean isSingleValue(SymbolStatsEstimate stats)
{
return stats.getDistinctValuesCount() == 1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ public PlanOptimizers(
ruleStats,
statsCalculator,
estimatedExchangesCostCalculator,
ImmutableSet.of(new DetermineJoinDistributionType())))); // Must run before AddExchanges
ImmutableSet.of(new DetermineJoinDistributionType(costComparator))))); // Must run before AddExchanges
builder.add(new DetermineSemiJoinDistributionType()); // Must run before AddExchanges
builder.add(
new IterativeOptimizer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,22 @@

package com.facebook.presto.sql.planner.iterative.rule;

import com.facebook.presto.cost.CostComparator;
import com.facebook.presto.cost.CostProvider;
import com.facebook.presto.cost.PlanNodeCostEstimate;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType;
import com.facebook.presto.sql.planner.iterative.Rule;
import com.facebook.presto.sql.planner.plan.JoinNode;
import com.facebook.presto.sql.planner.plan.JoinNode.DistributionType;
import com.facebook.presto.sql.planner.plan.PlanNode;
import com.google.common.collect.Ordering;

import java.util.ArrayList;
import java.util.List;

import static com.facebook.presto.SystemSessionProperties.getJoinDistributionType;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.AUTOMATIC;
import static com.facebook.presto.sql.planner.optimizations.QueryCardinalityUtil.isAtMostScalar;
import static com.facebook.presto.sql.planner.plan.JoinNode.DistributionType.PARTITIONED;
import static com.facebook.presto.sql.planner.plan.JoinNode.DistributionType.REPLICATED;
Expand All @@ -29,46 +38,120 @@
import static com.facebook.presto.sql.planner.plan.JoinNode.Type.LEFT;
import static com.facebook.presto.sql.planner.plan.JoinNode.Type.RIGHT;
import static com.facebook.presto.sql.planner.plan.Patterns.join;
import static java.util.Objects.requireNonNull;

public class DetermineJoinDistributionType
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to have a similar one for the SemiJoin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... we should. But why now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can introduce it as a separate PR. Just in case you can have it stashed somewhere, we can include it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't :(

implements Rule<JoinNode>
{
private static final Pattern<JoinNode> PATTERN = join().matching(joinNode -> !joinNode.getDistributionType().isPresent());

private final CostComparator costComparator;

public DetermineJoinDistributionType(CostComparator costComparator)
{
this.costComparator = requireNonNull(costComparator, "costComparator is null");
}

@Override
public Pattern<JoinNode> getPattern()
{
return PATTERN;
}

@Override
public Result apply(JoinNode node, Captures captures, Context context)
public Result apply(JoinNode joinNode, Captures captures, Context context)
{
JoinDistributionType joinDistributionType = getJoinDistributionType(context.getSession());
if (joinDistributionType == AUTOMATIC) {
return Result.ofPlanNode(getCostBasedJoin(joinNode, context));
}
return Result.ofPlanNode(getSyntacticOrderJoin(joinNode, context, joinDistributionType));
}

private PlanNode getCostBasedJoin(JoinNode joinNode, Context context)
{
DistributionType distributionType = determineDistributionType(node, context);
return Result.ofPlanNode(node.withDistributionType(distributionType));
CostProvider costProvider = context.getCostProvider();
List<PlanNodeWithCost> possibleJoinNodes = new ArrayList<>();

if (!mustPartition(joinNode)) {
possibleJoinNodes.add(getJoinNodeWithCost(costProvider, joinNode.withDistributionType(REPLICATED)));
}
if (!mustReplicate(joinNode, context)) {
possibleJoinNodes.add(getJoinNodeWithCost(costProvider, joinNode.withDistributionType(PARTITIONED)));
}

JoinNode flipped = joinNode.flipChildren();
if (!mustPartition(flipped)) {
possibleJoinNodes.add(getJoinNodeWithCost(costProvider, flipped.withDistributionType(REPLICATED)));
}
if (!mustReplicate(flipped, context)) {
possibleJoinNodes.add(getJoinNodeWithCost(costProvider, flipped.withDistributionType(PARTITIONED)));
}

if (possibleJoinNodes.stream().anyMatch(result -> result.getCost().hasUnknownComponents()) || possibleJoinNodes.isEmpty()) {
return getSyntacticOrderJoin(joinNode, context, AUTOMATIC);
}

// Using Ordering to facilitate rule determinism
Ordering<PlanNodeWithCost> planNodeOrderings = costComparator.forSession(context.getSession()).onResultOf(PlanNodeWithCost::getCost);
return planNodeOrderings.min(possibleJoinNodes).getPlanNode();
}

private static DistributionType determineDistributionType(JoinNode node, Context context)
private PlanNode getSyntacticOrderJoin(JoinNode joinNode, Context context, JoinDistributionType joinDistributionType)
{
JoinNode.Type type = node.getType();
if (type == RIGHT || type == FULL) {
// With REPLICATED, the unmatched rows from right-side would be duplicated.
return PARTITIONED;
if (mustPartition(joinNode)) {
return joinNode.withDistributionType(PARTITIONED);
}
if (mustReplicate(joinNode, context)) {
return joinNode.withDistributionType(REPLICATED);
}
if (joinDistributionType.canPartition()) {
return joinNode.withDistributionType(PARTITIONED);
}
return joinNode.withDistributionType(REPLICATED);
}

private static boolean mustPartition(JoinNode joinNode)
{
JoinNode.Type type = joinNode.getType();
// With REPLICATED, the unmatched rows from right-side would be duplicated.
return type == RIGHT || type == FULL;
}

if (node.getCriteria().isEmpty() && (type == INNER || type == LEFT)) {
private static boolean mustReplicate(JoinNode joinNode, Context context)
{
JoinNode.Type type = joinNode.getType();
if (joinNode.getCriteria().isEmpty() && (type == INNER || type == LEFT)) {
// There is nothing to partition on
return REPLICATED;
return true;
}
return isAtMostScalar(joinNode.getRight(), context.getLookup());
}

private static PlanNodeWithCost getJoinNodeWithCost(CostProvider costProvider, JoinNode possibleJoinNode)
{
return new PlanNodeWithCost(costProvider.getCumulativeCost(possibleJoinNode), possibleJoinNode);
}

private static class PlanNodeWithCost
{
private final PlanNode planNode;
private final PlanNodeCostEstimate cost;

if (isAtMostScalar(node.getRight(), context.getLookup())) {
return REPLICATED;
public PlanNodeWithCost(PlanNodeCostEstimate cost, PlanNode planNode)
{
this.cost = requireNonNull(cost, "cost is null");
this.planNode = requireNonNull(planNode, "planNode is null");
}

if (getJoinDistributionType(context.getSession()).canPartition()) {
return PARTITIONED;
public PlanNode getPlanNode()
{
return planNode;
}

return REPLICATED;
public PlanNodeCostEstimate getCost()
{
return cost;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public Range<Long> visitFilter(FilterNode node, Void context)
return Range.atLeast(0L);
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why get rid of the override?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't understand?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow i totally misread that. you added it.

public Range<Long> visitValues(ValuesNode node, Void context)
{
return Range.singleton((long) node.getRows().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.stream.Stream;

import static com.facebook.presto.sql.planner.SortExpressionExtractor.extractSortExpression;
import static com.facebook.presto.sql.planner.plan.JoinNode.DistributionType.PARTITIONED;
import static com.facebook.presto.sql.planner.plan.JoinNode.DistributionType.REPLICATED;
import static com.facebook.presto.sql.planner.plan.JoinNode.Type.FULL;
import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER;
import static com.facebook.presto.sql.planner.plan.JoinNode.Type.LEFT;
Expand Down Expand Up @@ -106,6 +108,21 @@ public JoinNode(@JsonProperty("id") PlanNodeId id,

checkArgument(!(criteria.isEmpty() && leftHashSymbol.isPresent()), "Left hash symbol is only valid in an equijoin");
checkArgument(!(criteria.isEmpty() && rightHashSymbol.isPresent()), "Right hash symbol is only valid in an equijoin");

if (distributionType.isPresent()) {
// The implementation of full outer join only works if the data is hash partitioned.
checkArgument(
!(distributionType.get() == REPLICATED && (type == RIGHT || type == FULL)),
"%s join do not work with %s distribution type",
type,
distributionType.get());
// It does not make sense to PARTITION when there is nothing to partition on
checkArgument(
!(distributionType.get() == PARTITIONED && criteria.isEmpty() && type != RIGHT && type != FULL),
"Equi criteria are empty, so %s join should not have %s distribution type",
type,
distributionType.get());
}
}

public JoinNode flipChildren()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

import java.util.List;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.stream.IntStream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -49,5 +52,14 @@ public static <T, R> List<R> mappedCopy(List<T> elements, Function<T, R> mapper)
.collect(toImmutableList());
}

public static <T> List<T> nElements(int n, IntFunction<T> function)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one of these function that are being only "sometimes" reused even by the author itself. Someone who doesn't know that this very specific function particularly exist - never would try to lookup one, but instead will try to implement either a copy, or just inline the algorithm. I'm not even saying it is available only in a single module. I don't think it is worth adding such a functions.

{
checkArgument(n >= 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a message

requireNonNull(function, "function is null");
return IntStream.range(0, n)
.mapToObj(function)
.collect(toImmutableList());
}

private MoreLists() {}
}
Loading