diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index c9ffc8507f77..42b836584f1c 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -45,6 +45,7 @@ import static io.trino.spi.session.PropertyMetadata.enumProperty; import static io.trino.spi.session.PropertyMetadata.integerProperty; import static io.trino.spi.session.PropertyMetadata.stringProperty; +import static io.trino.spi.type.DoubleType.DOUBLE; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.TimeZoneKey.getTimeZoneKey; import static java.lang.Math.min; @@ -56,6 +57,7 @@ public final class SystemSessionProperties public static final String OPTIMIZE_HASH_GENERATION = "optimize_hash_generation"; public static final String JOIN_DISTRIBUTION_TYPE = "join_distribution_type"; public static final String JOIN_MAX_BROADCAST_TABLE_SIZE = "join_max_broadcast_table_size"; + public static final String JOIN_MULTI_CLAUSE_INDEPENDENCE_FACTOR = "join_multi_clause_independence_factor"; public static final String DISTRIBUTED_INDEX_JOIN = "distributed_index_join"; public static final String HASH_PARTITION_COUNT = "hash_partition_count"; public static final String GROUPED_EXECUTION = "grouped_execution"; @@ -121,6 +123,7 @@ public final class SystemSessionProperties public static final String IGNORE_STATS_CALCULATOR_FAILURES = "ignore_stats_calculator_failures"; public static final String MAX_DRIVERS_PER_TASK = "max_drivers_per_task"; public static final String DEFAULT_FILTER_FACTOR_ENABLED = "default_filter_factor_enabled"; + public static final String FILTER_CONJUNCTION_INDEPENDENCE_FACTOR = "filter_conjunction_independence_factor"; public static final String SKIP_REDUNDANT_SORT = "skip_redundant_sort"; public static final String ALLOW_PUSHDOWN_INTO_CONNECTORS = "allow_pushdown_into_connectors"; public static final String COMPLEX_EXPRESSION_PUSHDOWN = "complex_expression_pushdown"; @@ -203,6 +206,15 @@ public SystemSessionProperties( "Maximum estimated size of a table that can be broadcast when using automatic join type selection", optimizerConfig.getJoinMaxBroadcastTableSize(), false), + new PropertyMetadata<>( + JOIN_MULTI_CLAUSE_INDEPENDENCE_FACTOR, + "Scales the strength of independence assumption for selectivity estimates of multi-clause joins", + DOUBLE, + Double.class, + optimizerConfig.getJoinMultiClauseIndependenceFactor(), + false, + value -> validateDoubleRange(value, JOIN_MULTI_CLAUSE_INDEPENDENCE_FACTOR, 0.0, 1.0), + value -> value), booleanProperty( DISTRIBUTED_INDEX_JOIN, "Distribute index joins on join keys instead of executing inline", @@ -551,6 +563,15 @@ public SystemSessionProperties( "use a default filter factor for unknown filters in a filter node", optimizerConfig.isDefaultFilterFactorEnabled(), false), + new PropertyMetadata<>( + FILTER_CONJUNCTION_INDEPENDENCE_FACTOR, + "Scales the strength of independence assumption for selectivity estimates of the conjunction of multiple filters", + DOUBLE, + Double.class, + optimizerConfig.getFilterConjunctionIndependenceFactor(), + false, + value -> validateDoubleRange(value, FILTER_CONJUNCTION_INDEPENDENCE_FACTOR, 0.0, 1.0), + value -> value), booleanProperty( SKIP_REDUNDANT_SORT, "Skip redundant sort operations", @@ -756,6 +777,11 @@ public static DataSize getJoinMaxBroadcastTableSize(Session session) return session.getSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, DataSize.class); } + public static double getJoinMultiClauseIndependenceFactor(Session session) + { + return session.getSystemProperty(JOIN_MULTI_CLAUSE_INDEPENDENCE_FACTOR, Double.class); + } + public static boolean isDistributedIndexJoinEnabled(Session session) { return session.getSystemProperty(DISTRIBUTED_INDEX_JOIN, Boolean.class); @@ -1103,6 +1129,17 @@ private static Integer validateIntegerValue(Object value, String property, int l return intValue; } + private static double validateDoubleRange(Object value, String property, double lowerBoundIncluded, double upperBoundIncluded) + { + double doubleValue = (double) value; + if (doubleValue < lowerBoundIncluded || doubleValue > upperBoundIncluded) { + throw new TrinoException( + INVALID_SESSION_PROPERTY, + format("%s must be in the range [%.2f, %.2f]: %.2f", property, lowerBoundIncluded, upperBoundIncluded, doubleValue)); + } + return doubleValue; + } + public static boolean isStatisticsCpuTimerEnabled(Session session) { return session.getSystemProperty(STATISTICS_CPU_TIMER_ENABLED, Boolean.class); @@ -1133,6 +1170,11 @@ public static boolean isDefaultFilterFactorEnabled(Session session) return session.getSystemProperty(DEFAULT_FILTER_FACTOR_ENABLED, Boolean.class); } + public static double getFilterConjunctionIndependenceFactor(Session session) + { + return session.getSystemProperty(FILTER_CONJUNCTION_INDEPENDENCE_FACTOR, Double.class); + } + public static boolean isSkipRedundantSort(Session session) { return session.getSystemProperty(SKIP_REDUNDANT_SORT, Boolean.class); diff --git a/core/trino-main/src/main/java/io/trino/cost/ComparisonStatsCalculator.java b/core/trino-main/src/main/java/io/trino/cost/ComparisonStatsCalculator.java index 96afc39c2c06..91b8dbac60ba 100644 --- a/core/trino-main/src/main/java/io/trino/cost/ComparisonStatsCalculator.java +++ b/core/trino-main/src/main/java/io/trino/cost/ComparisonStatsCalculator.java @@ -20,7 +20,7 @@ import java.util.OptionalDouble; import static io.trino.cost.SymbolStatsEstimate.buildFrom; -import static io.trino.util.MoreMath.firstNonNaN; +import static io.trino.util.MoreMath.averageExcludingNaNs; import static io.trino.util.MoreMath.max; import static io.trino.util.MoreMath.min; import static java.lang.Double.NEGATIVE_INFINITY; @@ -239,15 +239,4 @@ private static PlanNodeStatsEstimate estimateExpressionNotEqualToExpression( rightExpressionSymbol.ifPresent(symbol -> result.addSymbolStatistics(symbol, rightNullsFiltered)); return result.build(); } - - private static double averageExcludingNaNs(double first, double second) - { - if (isNaN(first) && isNaN(second)) { - return NaN; - } - if (!isNaN(first) && !isNaN(second)) { - return (first + second) / 2; - } - return firstNonNaN(first, second); - } } diff --git a/core/trino-main/src/main/java/io/trino/cost/FilterStatsCalculator.java b/core/trino-main/src/main/java/io/trino/cost/FilterStatsCalculator.java index 1f9b04e990b9..f80055075ea2 100644 --- a/core/trino-main/src/main/java/io/trino/cost/FilterStatsCalculator.java +++ b/core/trino-main/src/main/java/io/trino/cost/FilterStatsCalculator.java @@ -14,8 +14,10 @@ package io.trino.cost; import com.google.common.base.VerifyException; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ListMultimap; import io.trino.Session; import io.trino.execution.warnings.WarningCollector; import io.trino.security.AllowAllAccessControl; @@ -44,23 +46,29 @@ import io.trino.sql.tree.NodeRef; import io.trino.sql.tree.NotExpression; import io.trino.sql.tree.SymbolReference; +import io.trino.util.DisjointSet; import javax.annotation.Nullable; import javax.inject.Inject; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalDouble; +import java.util.Set; +import java.util.stream.IntStream; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.SystemSessionProperties.getFilterConjunctionIndependenceFactor; import static io.trino.cost.ComparisonStatsCalculator.estimateExpressionToExpressionComparison; import static io.trino.cost.ComparisonStatsCalculator.estimateExpressionToLiteralComparison; import static io.trino.cost.PlanNodeStatsEstimateMath.addStatsAndSumDistinctValues; import static io.trino.cost.PlanNodeStatsEstimateMath.capStats; +import static io.trino.cost.PlanNodeStatsEstimateMath.estimateCorrelatedConjunctionRowCount; +import static io.trino.cost.PlanNodeStatsEstimateMath.intersectCorrelatedStats; import static io.trino.cost.PlanNodeStatsEstimateMath.subtractSubsetStats; import static io.trino.spi.statistics.StatsUtil.toStatsRepresentation; import static io.trino.spi.type.BooleanType.BOOLEAN; @@ -68,6 +76,7 @@ import static io.trino.sql.ExpressionUtils.and; import static io.trino.sql.ExpressionUtils.getExpressionTypes; import static io.trino.sql.planner.ExpressionInterpreter.evaluateConstantExpression; +import static io.trino.sql.planner.SymbolsExtractor.extractUnique; import static io.trino.sql.tree.ComparisonExpression.Operator.EQUAL; import static io.trino.sql.tree.ComparisonExpression.Operator.GREATER_THAN_OR_EQUAL; import static io.trino.sql.tree.ComparisonExpression.Operator.LESS_THAN_OR_EQUAL; @@ -137,7 +146,14 @@ private class FilterExpressionStatsCalculatingVisitor @Override public PlanNodeStatsEstimate process(Node node, @Nullable Void context) { - return normalizer.normalize(super.process(node, context), types); + PlanNodeStatsEstimate output; + if (input.getOutputRowCount() == 0 || input.isOutputRowCountUnknown()) { + output = input; + } + else { + output = super.process(node, context); + } + return normalizer.normalize(output, types); } @Override @@ -169,35 +185,56 @@ protected PlanNodeStatsEstimate visitLogicalExpression(LogicalExpression node, V private PlanNodeStatsEstimate estimateLogicalAnd(List terms) { - // first try to estimate in the fair way - PlanNodeStatsEstimate estimate = process(terms.get(0)); - if (!estimate.isOutputRowCountUnknown()) { - for (int i = 1; i < terms.size(); i++) { - estimate = new FilterExpressionStatsCalculatingVisitor(estimate, session, types).process(terms.get(i)); + double filterConjunctionIndependenceFactor = getFilterConjunctionIndependenceFactor(session); + List estimates = estimateCorrelatedExpressions(terms, filterConjunctionIndependenceFactor); + double outputRowCount = estimateCorrelatedConjunctionRowCount( + input, + estimates, + filterConjunctionIndependenceFactor); + if (isNaN(outputRowCount)) { + return PlanNodeStatsEstimate.unknown(); + } + return normalizer.normalize(new PlanNodeStatsEstimate(outputRowCount, intersectCorrelatedStats(estimates)), types); + } - if (estimate.isOutputRowCountUnknown()) { - break; + /** + * There can be multiple predicate expressions for the same symbol, e.g. x > 0 AND x <= 1, x BETWEEN 1 AND 10. + * We attempt to detect such cases in extractCorrelatedGroups and calculate a combined estimate for each + * such group of expressions. This is done so that we don't apply the above scaling factors when combining estimates + * from conjunction of multiple predicates on the same symbol and underestimate the output. + **/ + private List estimateCorrelatedExpressions(List terms, double filterConjunctionIndependenceFactor) + { + ImmutableList.Builder estimatesBuilder = ImmutableList.builder(); + boolean hasUnestimatedTerm = false; + for (List correlatedExpressions : extractCorrelatedGroups(terms, filterConjunctionIndependenceFactor)) { + PlanNodeStatsEstimate combinedEstimate = PlanNodeStatsEstimate.unknown(); + for (Expression expression : correlatedExpressions) { + PlanNodeStatsEstimate estimate; + // combinedEstimate is unknown until the 1st known estimated term + if (combinedEstimate.isOutputRowCountUnknown()) { + estimate = process(expression); + } + else { + estimate = new FilterExpressionStatsCalculatingVisitor(combinedEstimate, session, types) + .process(expression); } - } - if (!estimate.isOutputRowCountUnknown()) { - return estimate; + if (estimate.isOutputRowCountUnknown()) { + hasUnestimatedTerm = true; + } + else { + // update combinedEstimate only when the term estimate is known so that all the known estimates + // can be applied progressively through FilterExpressionStatsCalculatingVisitor calls. + combinedEstimate = estimate; + } } + estimatesBuilder.add(combinedEstimate); } - - // If some of the filters cannot be estimated, take the smallest estimate. - // Apply 0.9 filter factor as "unknown filter" factor. - Optional smallest = terms.stream() - .map(this::process) - .filter(termEstimate -> !termEstimate.isOutputRowCountUnknown()) - .sorted(Comparator.comparingDouble(PlanNodeStatsEstimate::getOutputRowCount)) - .findFirst(); - - if (smallest.isEmpty()) { - return PlanNodeStatsEstimate.unknown(); + if (hasUnestimatedTerm) { + estimatesBuilder.add(PlanNodeStatsEstimate.unknown()); } - - return smallest.get().mapOutputRowCount(rowCount -> rowCount * UNKNOWN_FILTER_COEFFICIENT); + return estimatesBuilder.build(); } private PlanNodeStatsEstimate estimateLogicalOr(List terms) @@ -442,4 +479,53 @@ private OptionalDouble doubleValueFromLiteral(Type type, Expression literal) return toStatsRepresentation(type, literalValue); } } + + private static List> extractCorrelatedGroups(List terms, double filterConjunctionIndependenceFactor) + { + if (filterConjunctionIndependenceFactor == 1) { + // Allows the filters to be estimated as if there is no correlation between any of the terms + return ImmutableList.of(terms); + } + + ListMultimap expressionUniqueSymbols = ArrayListMultimap.create(); + terms.forEach(expression -> expressionUniqueSymbols.putAll(expression, extractUnique(expression))); + // Partition symbols into disjoint sets such that the symbols belonging to different disjoint sets + // do not appear together in any expression. + DisjointSet symbolsPartitioner = new DisjointSet<>(); + for (Expression term : terms) { + List expressionSymbols = expressionUniqueSymbols.get(term); + if (expressionSymbols.isEmpty()) { + continue; + } + // Ensure that symbol is added to DisjointSet when there is only one symbol in the list + symbolsPartitioner.find(expressionSymbols.get(0)); + for (int i = 1; i < expressionSymbols.size(); i++) { + symbolsPartitioner.findAndUnion(expressionSymbols.get(0), expressionSymbols.get(i)); + } + } + + // Use disjoint sets of symbols to partition the given list of expressions + List> symbolPartitions = ImmutableList.copyOf(symbolsPartitioner.getEquivalentClasses()); + checkState(symbolPartitions.size() <= terms.size(), "symbolPartitions size exceeds number of expressions"); + ListMultimap expressionPartitions = ArrayListMultimap.create(); + for (Expression term : terms) { + List expressionSymbols = expressionUniqueSymbols.get(term); + int expressionPartitionId; + if (expressionSymbols.isEmpty()) { + expressionPartitionId = symbolPartitions.size(); // For expressions with no symbols + } + else { + Symbol symbol = expressionSymbols.get(0); // Lookup any symbol to find the partition id + expressionPartitionId = IntStream.range(0, symbolPartitions.size()) + .filter(partition -> symbolPartitions.get(partition).contains(symbol)) + .findFirst() + .orElseThrow(); + } + expressionPartitions.put(expressionPartitionId, term); + } + + return expressionPartitions.keySet().stream() + .map(expressionPartitions::get) + .collect(toImmutableList()); + } } diff --git a/core/trino-main/src/main/java/io/trino/cost/JoinStatsRule.java b/core/trino-main/src/main/java/io/trino/cost/JoinStatsRule.java index 79bb2eded80f..19495492d9b8 100644 --- a/core/trino-main/src/main/java/io/trino/cost/JoinStatsRule.java +++ b/core/trino-main/src/main/java/io/trino/cost/JoinStatsRule.java @@ -26,19 +26,21 @@ import io.trino.util.MoreMath; import java.util.Collection; -import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Optional; -import java.util.Queue; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Sets.difference; +import static io.trino.SystemSessionProperties.getJoinMultiClauseIndependenceFactor; import static io.trino.cost.FilterStatsCalculator.UNKNOWN_FILTER_COEFFICIENT; +import static io.trino.cost.PlanNodeStatsEstimateMath.estimateCorrelatedConjunctionRowCount; import static io.trino.cost.SymbolStatsEstimate.buildFrom; import static io.trino.sql.ExpressionUtils.extractConjuncts; import static io.trino.sql.planner.plan.Patterns.join; import static io.trino.sql.tree.ComparisonExpression.Operator.EQUAL; +import static io.trino.util.MoreMath.firstNonNaN; import static java.lang.Double.NaN; import static java.lang.Double.isNaN; import static java.lang.Math.min; @@ -180,84 +182,64 @@ private PlanNodeStatsEstimate filterByEquiJoinClauses( TypeProvider types) { checkArgument(!clauses.isEmpty(), "clauses is empty"); - PlanNodeStatsEstimate result = PlanNodeStatsEstimate.unknown(); - // Join equality clauses are usually correlated. Therefore we shouldn't treat each join equality - // clause separately because stats estimates would be way off. Instead we choose so called - // "driving clause" which mostly reduces join output rows cardinality and apply UNKNOWN_FILTER_COEFFICIENT - // for other (auxiliary) clauses. - Queue remainingClauses = new LinkedList<>(clauses); - EquiJoinClause drivingClause = remainingClauses.poll(); - for (int i = 0; i < clauses.size(); i++) { - PlanNodeStatsEstimate estimate = filterByEquiJoinClauses(stats, drivingClause, remainingClauses, session, types); - if (result.isOutputRowCountUnknown() || (!estimate.isOutputRowCountUnknown() && estimate.getOutputRowCount() < result.getOutputRowCount())) { - result = estimate; - } - remainingClauses.add(drivingClause); - drivingClause = remainingClauses.poll(); - } - - return result; - } + // Join equality clauses are usually correlated. Therefore, we shouldn't treat each join equality + // clause separately because stats estimates would be way off. + List knownEstimates = clauses.stream() + .map(clause -> { + ComparisonExpression predicate = new ComparisonExpression(EQUAL, clause.getLeft().toSymbolReference(), clause.getRight().toSymbolReference()); + return new PlanNodeStatsEstimateWithClause(filterStatsCalculator.filterStats(stats, predicate, session, types), clause); + }) + .collect(toImmutableList()); - private PlanNodeStatsEstimate filterByEquiJoinClauses( - PlanNodeStatsEstimate stats, - EquiJoinClause drivingClause, - Collection remainingClauses, - Session session, - TypeProvider types) - { - ComparisonExpression drivingPredicate = new ComparisonExpression(EQUAL, drivingClause.getLeft().toSymbolReference(), drivingClause.getRight().toSymbolReference()); - PlanNodeStatsEstimate filteredStats = filterStatsCalculator.filterStats(stats, drivingPredicate, session, types); - for (EquiJoinClause clause : remainingClauses) { - filteredStats = filterByAuxiliaryClause(filteredStats, clause, types); + double outputRowCount = estimateCorrelatedConjunctionRowCount( + stats, + knownEstimates.stream().map(PlanNodeStatsEstimateWithClause::getEstimate).collect(toImmutableList()), + getJoinMultiClauseIndependenceFactor(session)); + if (isNaN(outputRowCount)) { + return PlanNodeStatsEstimate.unknown(); } - return filteredStats; + return normalizer.normalize(new PlanNodeStatsEstimate(outputRowCount, intersectCorrelatedJoinClause(stats, knownEstimates)), types); } - private PlanNodeStatsEstimate filterByAuxiliaryClause(PlanNodeStatsEstimate stats, EquiJoinClause clause, TypeProvider types) - { - // we just clear null fraction and adjust ranges here - // selectivity is mostly handled by driving clause. We just scale heuristically by UNKNOWN_FILTER_COEFFICIENT here. - - SymbolStatsEstimate leftStats = stats.getSymbolStatistics(clause.getLeft()); - SymbolStatsEstimate rightStats = stats.getSymbolStatistics(clause.getRight()); - StatisticRange leftRange = StatisticRange.from(leftStats); - StatisticRange rightRange = StatisticRange.from(rightStats); - - StatisticRange intersect = leftRange.intersect(rightRange); - double leftFilterValue = firstNonNaN(leftRange.overlapPercentWith(intersect), 1); - double rightFilterValue = firstNonNaN(rightRange.overlapPercentWith(intersect), 1); - double leftNdvInRange = leftFilterValue * leftRange.getDistinctValuesCount(); - double rightNdvInRange = rightFilterValue * rightRange.getDistinctValuesCount(); - double retainedNdv = MoreMath.min(leftNdvInRange, rightNdvInRange); - - SymbolStatsEstimate newLeftStats = buildFrom(leftStats) - .setNullsFraction(0) - .setStatisticsRange(intersect) - .setDistinctValuesCount(retainedNdv) - .build(); - - SymbolStatsEstimate newRightStats = buildFrom(rightStats) - .setNullsFraction(0) - .setStatisticsRange(intersect) - .setDistinctValuesCount(retainedNdv) - .build(); - - PlanNodeStatsEstimate.Builder result = PlanNodeStatsEstimate.buildFrom(stats) - .setOutputRowCount(stats.getOutputRowCount() * UNKNOWN_FILTER_COEFFICIENT) - .addSymbolStatistics(clause.getLeft(), newLeftStats) - .addSymbolStatistics(clause.getRight(), newRightStats); - return normalizer.normalize(result.build(), types); - } - - private static double firstNonNaN(double... values) + private static Map intersectCorrelatedJoinClause( + PlanNodeStatsEstimate stats, + List equiJoinClauseEstimates) { - for (double value : values) { - if (!isNaN(value)) { - return value; - } + // Add initial statistics (including stats for columns which are not part of equi-join clauses) + PlanNodeStatsEstimate.Builder result = PlanNodeStatsEstimate.builder() + .addSymbolStatistics(stats.getSymbolStatistics()); + + for (PlanNodeStatsEstimateWithClause estimateWithClause : equiJoinClauseEstimates) { + EquiJoinClause clause = estimateWithClause.getClause(); + // we just clear null fraction and adjust ranges here, selectivity is handled outside this function + SymbolStatsEstimate leftStats = stats.getSymbolStatistics(clause.getLeft()); + SymbolStatsEstimate rightStats = stats.getSymbolStatistics(clause.getRight()); + StatisticRange leftRange = StatisticRange.from(leftStats); + StatisticRange rightRange = StatisticRange.from(rightStats); + + StatisticRange intersect = leftRange.intersect(rightRange); + double leftFilterValue = firstNonNaN(leftRange.overlapPercentWith(intersect), 1); + double rightFilterValue = firstNonNaN(rightRange.overlapPercentWith(intersect), 1); + double leftNdvInRange = leftFilterValue * leftRange.getDistinctValuesCount(); + double rightNdvInRange = rightFilterValue * rightRange.getDistinctValuesCount(); + double retainedNdv = MoreMath.min(leftNdvInRange, rightNdvInRange); + + SymbolStatsEstimate newLeftStats = buildFrom(leftStats) + .setNullsFraction(0) + .setStatisticsRange(intersect) + .setDistinctValuesCount(retainedNdv) + .build(); + + SymbolStatsEstimate newRightStats = buildFrom(rightStats) + .setNullsFraction(0) + .setStatisticsRange(intersect) + .setDistinctValuesCount(retainedNdv) + .build(); + + result.addSymbolStatistics(clause.getLeft(), newLeftStats) + .addSymbolStatistics(clause.getRight(), newRightStats); } - throw new IllegalArgumentException("All values are NaN"); + return result.build().getSymbolStatistics(); } /** @@ -410,4 +392,26 @@ private List flippedCriteria(JoinNode node) .map(EquiJoinClause::flip) .collect(toImmutableList()); } + + private static class PlanNodeStatsEstimateWithClause + { + private final PlanNodeStatsEstimate estimate; + private final EquiJoinClause clause; + + private PlanNodeStatsEstimateWithClause(PlanNodeStatsEstimate estimate, EquiJoinClause clause) + { + this.estimate = requireNonNull(estimate, "estimate is null"); + this.clause = requireNonNull(clause, "clause is null"); + } + + private PlanNodeStatsEstimate getEstimate() + { + return estimate; + } + + private EquiJoinClause getClause() + { + return clause; + } + } } diff --git a/core/trino-main/src/main/java/io/trino/cost/PlanNodeStatsEstimateMath.java b/core/trino-main/src/main/java/io/trino/cost/PlanNodeStatsEstimateMath.java index 05b09f8e0b4c..b692884a6460 100644 --- a/core/trino-main/src/main/java/io/trino/cost/PlanNodeStatsEstimateMath.java +++ b/core/trino-main/src/main/java/io/trino/cost/PlanNodeStatsEstimateMath.java @@ -13,11 +13,20 @@ */ package io.trino.cost; +import io.trino.sql.planner.Symbol; +import io.trino.util.MoreMath; + +import java.util.List; +import java.util.Map; + import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.cost.FilterStatsCalculator.UNKNOWN_FILTER_COEFFICIENT; import static java.lang.Double.NaN; import static java.lang.Double.isNaN; import static java.lang.Double.max; import static java.lang.Double.min; +import static java.util.Comparator.comparingDouble; import static java.util.stream.Stream.concat; public final class PlanNodeStatsEstimateMath @@ -135,6 +144,83 @@ public static PlanNodeStatsEstimate capStats(PlanNodeStatsEstimate stats, PlanNo return result.build(); } + public static Map intersectCorrelatedStats(List estimates) + { + checkArgument(!estimates.isEmpty(), "estimates is empty"); + if (estimates.size() == 1) { + return estimates.get(0).getSymbolStatistics(); + } + PlanNodeStatsEstimate.Builder result = PlanNodeStatsEstimate.builder(); + // Update statistic range for symbols + estimates.stream().flatMap(estimate -> estimate.getSymbolsWithKnownStatistics().stream()) + .distinct() + .forEach(symbol -> { + List symbolStatsEstimates = estimates.stream() + .map(estimate -> estimate.getSymbolStatistics(symbol)) + .collect(toImmutableList()); + + StatisticRange intersect = symbolStatsEstimates.stream() + .map(StatisticRange::from) + .reduce(StatisticRange::intersect) + .orElseThrow(); + + // intersectCorrelatedStats should try to produce stats as if filters are applied in sequence. + // Using min works for filters like (a > 10 AND b < 10), but won't work for + // (a > 10 AND b IS NULL). However, former case is more common. + double nullsFraction = symbolStatsEstimates.stream() + .map(SymbolStatsEstimate::getNullsFraction) + .reduce(MoreMath::minExcludeNaN) + .orElseThrow(); + + double averageRowSize = symbolStatsEstimates.stream() + .map(SymbolStatsEstimate::getAverageRowSize) + .reduce(MoreMath::averageExcludingNaNs) + .orElseThrow(); + + result.addSymbolStatistics(symbol, SymbolStatsEstimate.builder() + .setStatisticsRange(intersect) + .setNullsFraction(nullsFraction) + .setAverageRowSize(averageRowSize) + .build()); + }); + return result.build().getSymbolStatistics(); + } + + public static double estimateCorrelatedConjunctionRowCount( + PlanNodeStatsEstimate input, + List estimates, + double independenceFactor) + { + checkArgument(!estimates.isEmpty(), "estimates is empty"); + if (input.isOutputRowCountUnknown() || input.getOutputRowCount() == 0) { + return input.getOutputRowCount(); + } + List knownSortedEstimates = estimates.stream() + .filter(estimateInfo -> !estimateInfo.isOutputRowCountUnknown()) + .sorted(comparingDouble(PlanNodeStatsEstimate::getOutputRowCount)) + .collect(toImmutableList()); + if (knownSortedEstimates.isEmpty()) { + return NaN; + } + + PlanNodeStatsEstimate combinedEstimate = knownSortedEstimates.get(0); + double combinedSelectivity = combinedEstimate.getOutputRowCount() / input.getOutputRowCount(); + double combinedIndependenceFactor = 1.0; + // For independenceFactor = 0.75 and terms t1, t2, t3 + // Combined selectivity = (t1 selectivity) * ((t2 selectivity) ^ 0.75) * ((t3 selectivity) ^ (0.75 * 0.75)) + // independenceFactor = 1 implies the terms are assumed to have no correlation and their selectivities are multiplied without scaling. + // independenceFactor = 0 implies the terms are assumed to be fully correlated and only the most selective term drives the selectivity. + for (int i = 1; i < knownSortedEstimates.size(); i++) { + PlanNodeStatsEstimate term = knownSortedEstimates.get(i); + combinedIndependenceFactor *= independenceFactor; + combinedSelectivity *= Math.pow(term.getOutputRowCount() / input.getOutputRowCount(), combinedIndependenceFactor); + } + double outputRowCount = input.getOutputRowCount() * combinedSelectivity; + // TODO use UNKNOWN_FILTER_COEFFICIENT only when default-filter-factor is enabled + boolean hasUnestimatedTerm = estimates.stream().anyMatch(PlanNodeStatsEstimate::isOutputRowCountUnknown); + return hasUnestimatedTerm ? outputRowCount * UNKNOWN_FILTER_COEFFICIENT : outputRowCount; + } + private static PlanNodeStatsEstimate createZeroStats(PlanNodeStatsEstimate stats) { PlanNodeStatsEstimate.Builder result = PlanNodeStatsEstimate.builder(); diff --git a/core/trino-main/src/main/java/io/trino/cost/StatisticRange.java b/core/trino-main/src/main/java/io/trino/cost/StatisticRange.java index 98398ef14350..60eb988a4802 100644 --- a/core/trino-main/src/main/java/io/trino/cost/StatisticRange.java +++ b/core/trino-main/src/main/java/io/trino/cost/StatisticRange.java @@ -17,6 +17,8 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.util.MoreMath.maxExcludeNaN; +import static io.trino.util.MoreMath.minExcludeNaN; import static java.lang.Double.NaN; import static java.lang.Double.isFinite; import static java.lang.Double.isInfinite; @@ -173,28 +175,6 @@ public StatisticRange addAndCollapseDistinctValues(StatisticRange other) return new StatisticRange(minExcludeNaN(low, other.low), maxExcludeNaN(high, other.high), newDistinctValues); } - private static double minExcludeNaN(double v1, double v2) - { - if (isNaN(v1)) { - return v2; - } - if (isNaN(v2)) { - return v1; - } - return min(v1, v2); - } - - private static double maxExcludeNaN(double v1, double v2) - { - if (isNaN(v1)) { - return v2; - } - if (isNaN(v2)) { - return v1; - } - return max(v1, v2); - } - @Override public boolean equals(Object o) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java b/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java index a6d950333e88..827f2ae3d4d6 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java @@ -19,6 +19,7 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; +import javax.validation.constraints.Max; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; @@ -34,6 +35,7 @@ public class OptimizerConfig private DataSize joinMaxBroadcastTableSize = DataSize.of(100, MEGABYTE); private JoinDistributionType joinDistributionType = JoinDistributionType.AUTOMATIC; + private double joinMultiClauseIndependenceFactor = 0.25; private JoinReorderingStrategy joinReorderingStrategy = JoinReorderingStrategy.AUTOMATIC; private int maxReorderedJoins = 9; @@ -43,6 +45,7 @@ public class OptimizerConfig private boolean collectPlanStatisticsForAllQueries; private boolean ignoreStatsCalculatorFailures = true; private boolean defaultFilterFactorEnabled; + private double filterConjunctionIndependenceFactor = 0.75; private boolean colocatedJoinsEnabled; private boolean distributedIndexJoinsEnabled; @@ -164,6 +167,21 @@ public OptimizerConfig setJoinMaxBroadcastTableSize(DataSize joinMaxBroadcastTab return this; } + @Min(0) + @Max(1) + public double getJoinMultiClauseIndependenceFactor() + { + return joinMultiClauseIndependenceFactor; + } + + @Config("optimizer.join-multi-clause-independence-factor") + @ConfigDescription("Scales the strength of independence assumption for selectivity estimates of multi-clause joins") + public OptimizerConfig setJoinMultiClauseIndependenceFactor(double joinMultiClauseIndependenceFactor) + { + this.joinMultiClauseIndependenceFactor = joinMultiClauseIndependenceFactor; + return this; + } + public JoinReorderingStrategy getJoinReorderingStrategy() { return joinReorderingStrategy; @@ -254,6 +272,21 @@ public OptimizerConfig setDefaultFilterFactorEnabled(boolean defaultFilterFactor return this; } + @Min(0) + @Max(1) + public double getFilterConjunctionIndependenceFactor() + { + return filterConjunctionIndependenceFactor; + } + + @Config("optimizer.filter-conjunction-independence-factor") + @ConfigDescription("Scales the strength of independence assumption for selectivity estimates of the conjunction of multiple filters") + public OptimizerConfig setFilterConjunctionIndependenceFactor(double filterConjunctionIndependenceFactor) + { + this.filterConjunctionIndependenceFactor = filterConjunctionIndependenceFactor; + return this; + } + public boolean isColocatedJoinsEnabled() { return colocatedJoinsEnabled; diff --git a/core/trino-main/src/main/java/io/trino/util/MoreMath.java b/core/trino-main/src/main/java/io/trino/util/MoreMath.java index 582c8640c6cb..06820b18b384 100644 --- a/core/trino-main/src/main/java/io/trino/util/MoreMath.java +++ b/core/trino-main/src/main/java/io/trino/util/MoreMath.java @@ -15,6 +15,7 @@ import java.util.stream.DoubleStream; +import static java.lang.Double.NaN; import static java.lang.Double.isNaN; public final class MoreMath @@ -110,4 +111,37 @@ public static double firstNonNaN(double... values) } throw new IllegalArgumentException("All values are NaN"); } + + public static double averageExcludingNaNs(double first, double second) + { + if (isNaN(first) && isNaN(second)) { + return NaN; + } + if (!isNaN(first) && !isNaN(second)) { + return (first + second) / 2; + } + return firstNonNaN(first, second); + } + + public static double minExcludeNaN(double v1, double v2) + { + if (isNaN(v1)) { + return v2; + } + if (isNaN(v2)) { + return v1; + } + return min(v1, v2); + } + + public static double maxExcludeNaN(double v1, double v2) + { + if (isNaN(v1)) { + return v2; + } + if (isNaN(v2)) { + return v1; + } + return max(v1, v2); + } } diff --git a/core/trino-main/src/test/java/io/trino/cost/StatsCalculatorTester.java b/core/trino-main/src/test/java/io/trino/cost/StatsCalculatorTester.java index 55cc4c9a16ec..48cdcda1fdad 100644 --- a/core/trino-main/src/test/java/io/trino/cost/StatsCalculatorTester.java +++ b/core/trino-main/src/test/java/io/trino/cost/StatsCalculatorTester.java @@ -67,6 +67,11 @@ private static LocalQueryRunner createQueryRunner(Session session) } public StatsCalculatorAssertion assertStatsFor(Function planProvider) + { + return assertStatsFor(session, planProvider); + } + + public StatsCalculatorAssertion assertStatsFor(Session session, Function planProvider) { PlanBuilder planBuilder = new PlanBuilder(new PlanNodeIdAllocator(), metadata, session); PlanNode planNode = planProvider.apply(planBuilder); diff --git a/core/trino-main/src/test/java/io/trino/cost/TestFilterStatsCalculator.java b/core/trino-main/src/test/java/io/trino/cost/TestFilterStatsCalculator.java index 7e2c0c9aab2e..b814aa82a3b7 100644 --- a/core/trino-main/src/test/java/io/trino/cost/TestFilterStatsCalculator.java +++ b/core/trino-main/src/test/java/io/trino/cost/TestFilterStatsCalculator.java @@ -27,6 +27,9 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.util.function.Consumer; + +import static io.trino.SystemSessionProperties.FILTER_CONJUNCTION_INDEPENDENCE_FACTOR; import static io.trino.sql.ExpressionTestUtils.planExpression; import static io.trino.sql.planner.TestingPlannerContext.PLANNER_CONTEXT; import static io.trino.sql.planner.TypeAnalyzer.createTestingTypeAnalyzer; @@ -257,6 +260,13 @@ public void testUnsupportedExpression() @Test public void testAndStats() { + // unknown input + assertExpression("x < 0e0 AND x < 1e0", PlanNodeStatsEstimate.unknown()).outputRowsCountUnknown(); + assertExpression("x < 0e0 AND y < 1e0", PlanNodeStatsEstimate.unknown()).outputRowsCountUnknown(); + // zeroStatistics input + assertExpression("x < 0e0 AND x < 1e0", zeroStatistics).equalTo(zeroStatistics); + assertExpression("x < 0e0 AND y < 1e0", zeroStatistics).equalTo(zeroStatistics); + assertExpression("x < 0e0 AND x > 1e0").equalTo(zeroStatistics); assertExpression("x < 0e0 AND x > DOUBLE '-7.5'") @@ -301,6 +311,128 @@ public void testAndStats() assertExpression("CAST(NULL AS boolean) AND CAST(NULL AS boolean)").equalTo(zeroStatistics); assertExpression("CAST(NULL AS boolean) AND (x < 0e0 AND x > 1e0)").equalTo(zeroStatistics); + + Consumer symbolAssertX = symbolAssert -> symbolAssert.averageRowSize(4.0) + .lowValue(-5.0) + .highValue(5.0) + .distinctValuesCount(20.0) + .nullsFraction(0.0); + Consumer symbolAssertY = symbolAssert -> symbolAssert.averageRowSize(4.0) + .lowValue(1.0) + .highValue(5.0) + .distinctValuesCount(16.0) + .nullsFraction(0.0); + + double inputRowCount = standardInputStatistics.getOutputRowCount(); + double filterSelectivityX = 0.375; + double inequalityFilterSelectivityY = 0.4; + assertExpression( + "(x BETWEEN -5 AND 5) AND y > 1", + Session.builder(session).setSystemProperty(FILTER_CONJUNCTION_INDEPENDENCE_FACTOR, "0").build()) + .outputRowsCount(filterSelectivityX * inputRowCount) + .symbolStats("x", symbolAssertX) + .symbolStats("y", symbolAssertY); + + assertExpression( + "(x BETWEEN -5 AND 5) AND y > 1", + Session.builder(session).setSystemProperty(FILTER_CONJUNCTION_INDEPENDENCE_FACTOR, "1").build()) + .outputRowsCount(filterSelectivityX * inequalityFilterSelectivityY * inputRowCount) + .symbolStats("x", symbolAssertX) + .symbolStats("y", symbolAssertY); + + assertExpression( + "(x BETWEEN -5 AND 5) AND y > 1", + Session.builder(session).setSystemProperty(FILTER_CONJUNCTION_INDEPENDENCE_FACTOR, "0.5").build()) + .outputRowsCount(filterSelectivityX * (Math.pow(inequalityFilterSelectivityY, 0.5)) * inputRowCount) + .symbolStats("x", symbolAssertX) + .symbolStats("y", symbolAssertY); + + double nullFilterSelectivityY = 0.5; + assertExpression( + "(x BETWEEN -5 AND 5) AND y IS NULL", + Session.builder(session).setSystemProperty(FILTER_CONJUNCTION_INDEPENDENCE_FACTOR, "1").build()) + .outputRowsCount(filterSelectivityX * nullFilterSelectivityY * inputRowCount) + .symbolStats("x", symbolAssertX) + .symbolStats("y", symbolAssert -> symbolAssert.isEqualTo(SymbolStatsEstimate.zero())); + + assertExpression( + "(x BETWEEN -5 AND 5) AND y IS NULL", + Session.builder(session).setSystemProperty(FILTER_CONJUNCTION_INDEPENDENCE_FACTOR, "0.5").build()) + .outputRowsCount(filterSelectivityX * Math.pow(nullFilterSelectivityY, 0.5) * inputRowCount) + .symbolStats("x", symbolAssertX) + .symbolStats("y", symbolAssert -> symbolAssert.isEqualTo(SymbolStatsEstimate.zero())); + + assertExpression( + "(x BETWEEN -5 AND 5) AND y IS NULL", + Session.builder(session).setSystemProperty(FILTER_CONJUNCTION_INDEPENDENCE_FACTOR, "0").build()) + .outputRowsCount(filterSelectivityX * inputRowCount) + .symbolStats("x", symbolAssertX) + .symbolStats("y", symbolAssert -> symbolAssert.isEqualTo(SymbolStatsEstimate.zero())); + + assertExpression( + "y < 1 AND 0 < y", + Session.builder(session).setSystemProperty(FILTER_CONJUNCTION_INDEPENDENCE_FACTOR, "0.5").build()) + .outputRowsCount(100) + .symbolStats("y", symbolAssert -> symbolAssert.averageRowSize(4.0) + .lowValue(0.0) + .highValue(1.0) + .distinctValuesCount(4.0) + .nullsFraction(0.0)); + + assertExpression( + "x > 0 AND (y < 1 OR y > 2)", + Session.builder(session).setSystemProperty(FILTER_CONJUNCTION_INDEPENDENCE_FACTOR, "0.5").build()) + .outputRowsCount(filterSelectivityX * (Math.pow(inequalityFilterSelectivityY, 0.5)) * inputRowCount) + .symbolStats("x", symbolAssert -> symbolAssert.averageRowSize(4.0) + .lowValue(0.0) + .highValue(10.0) + .distinctValuesCount(20.0) + .nullsFraction(0.0)) + .symbolStats("y", symbolAssert -> symbolAssert.averageRowSize(4.0) + .lowValue(0.0) + .highValue(5.0) + .distinctValuesCount(16.0) + .nullsFraction(0.0)); + + assertExpression( + "x > 0 AND (x < 1 OR y > 1)", + Session.builder(session).setSystemProperty(FILTER_CONJUNCTION_INDEPENDENCE_FACTOR, "0.5").build()) + .outputRowsCount(172.0) + .symbolStats("x", symbolAssert -> symbolAssert.averageRowSize(4.0) + .lowValue(0.0) + .highValue(10.0) + .distinctValuesCount(20.0) + .nullsFraction(0.0)) + .symbolStats("y", symbolAssert -> symbolAssert.averageRowSize(4.0) + .lowValue(0.0) + .highValue(5.0) + .distinctValuesCount(20.0) + .nullsFraction(0.1053779069)); + + assertExpression( + "x IN (0, 1, 2) AND (x = 0 OR (x = 1 AND y = 1) OR (x = 2 AND y = 1))", + Session.builder(session).setSystemProperty(FILTER_CONJUNCTION_INDEPENDENCE_FACTOR, "0.5").build()) + .outputRowsCount(20.373798) + .symbolStats("x", symbolAssert -> symbolAssert.averageRowSize(4.0) + .lowValue(0.0) + .highValue(2.0) + .distinctValuesCount(2.623798) + .nullsFraction(0.0)) + .symbolStats("y", symbolAssert -> symbolAssert.averageRowSize(4.0) + .lowValue(0.0) + .highValue(5.0) + .distinctValuesCount(15.686298) + .nullsFraction(0.2300749269)); + + assertExpression( + "x > 0 AND CAST(NULL AS boolean)", + Session.builder(session).setSystemProperty(FILTER_CONJUNCTION_INDEPENDENCE_FACTOR, "0.5").build()) + .outputRowsCount(filterSelectivityX * inputRowCount * 0.9) + .symbolStats("x", symbolAssert -> symbolAssert.averageRowSize(4.0) + .lowValue(0.0) + .highValue(10.0) + .distinctValuesCount(20.0) + .nullsFraction(0.0)); } @Test @@ -573,16 +705,26 @@ public void testInPredicateFilter() private PlanNodeStatsAssertion assertExpression(String expression) { - return assertExpression(planExpression(PLANNER_CONTEXT, session, standardTypes, expression(expression))); + return assertExpression(expression, session); + } + + private PlanNodeStatsAssertion assertExpression(String expression, PlanNodeStatsEstimate inputStatistics) + { + return assertExpression(planExpression(PLANNER_CONTEXT, session, standardTypes, expression(expression)), session, inputStatistics); + } + + private PlanNodeStatsAssertion assertExpression(String expression, Session session) + { + return assertExpression(planExpression(PLANNER_CONTEXT, session, standardTypes, expression(expression)), session, standardInputStatistics); } - private PlanNodeStatsAssertion assertExpression(Expression expression) + private PlanNodeStatsAssertion assertExpression(Expression expression, Session session, PlanNodeStatsEstimate inputStatistics) { return transaction(new TestingTransactionManager(), new AllowAllAccessControl()) .singleStatement() .execute(session, transactionSession -> { return PlanNodeStatsAssertion.assertThat(statsCalculator.filterStats( - standardInputStatistics, + inputStatistics, expression, transactionSession, standardTypes)); diff --git a/core/trino-main/src/test/java/io/trino/cost/TestJoinStatsRule.java b/core/trino-main/src/test/java/io/trino/cost/TestJoinStatsRule.java index fb707244118b..5bf42da51c7b 100644 --- a/core/trino-main/src/test/java/io/trino/cost/TestJoinStatsRule.java +++ b/core/trino-main/src/test/java/io/trino/cost/TestJoinStatsRule.java @@ -18,14 +18,18 @@ import io.trino.spi.type.Type; import io.trino.sql.planner.Symbol; import io.trino.sql.planner.TypeProvider; +import io.trino.sql.planner.iterative.rule.test.PlanBuilder; import io.trino.sql.planner.plan.JoinNode; import io.trino.sql.planner.plan.JoinNode.EquiJoinClause; +import io.trino.sql.planner.plan.PlanNode; import io.trino.sql.tree.ComparisonExpression; import io.trino.sql.tree.LongLiteral; import org.testng.annotations.Test; import java.util.Optional; +import java.util.function.Function; +import static io.trino.SystemSessionProperties.JOIN_MULTI_CLAUSE_INDEPENDENCE_FACTOR; import static io.trino.cost.FilterStatsCalculator.UNKNOWN_FILTER_COEFFICIENT; import static io.trino.cost.PlanNodeStatsAssertion.assertThat; import static io.trino.spi.type.BigintType.BIGINT; @@ -36,6 +40,7 @@ import static io.trino.sql.planner.plan.JoinNode.Type.INNER; import static io.trino.sql.planner.plan.JoinNode.Type.LEFT; import static io.trino.sql.planner.plan.JoinNode.Type.RIGHT; +import static io.trino.testing.TestingSession.testSessionBuilder; import static java.lang.Double.NaN; import static org.testng.Assert.assertEquals; @@ -114,26 +119,28 @@ public void testStatsForInnerJoin() @Test public void testStatsForInnerJoinWithRepeatedClause() { - double innerJoinRowCount = LEFT_ROWS_COUNT * RIGHT_ROWS_COUNT / LEFT_JOIN_COLUMN_NDV * LEFT_JOIN_COLUMN_NON_NULLS * RIGHT_JOIN_COLUMN_NON_NULLS // driver join clause - * UNKNOWN_FILTER_COEFFICIENT; // auxiliary join clause + double clauseSelectivity = 1.0 / LEFT_JOIN_COLUMN_NDV * LEFT_JOIN_COLUMN_NON_NULLS * RIGHT_JOIN_COLUMN_NON_NULLS; + double innerJoinRowCount = LEFT_ROWS_COUNT * RIGHT_ROWS_COUNT * clauseSelectivity * Math.pow(clauseSelectivity, 0.5); PlanNodeStatsEstimate innerJoinStats = planNodeStats( innerJoinRowCount, symbolStatistics(LEFT_JOIN_COLUMN, 5.0, 20.0, 0.0, RIGHT_JOIN_COLUMN_NDV), symbolStatistics(RIGHT_JOIN_COLUMN, 5.0, 20.0, 0.0, RIGHT_JOIN_COLUMN_NDV), LEFT_OTHER_COLUMN_STATS, RIGHT_OTHER_COLUMN_STATS); - tester().assertStatsFor(pb -> { - Symbol leftJoinColumnSymbol = pb.symbol(LEFT_JOIN_COLUMN, BIGINT); - Symbol rightJoinColumnSymbol = pb.symbol(RIGHT_JOIN_COLUMN, DOUBLE); - Symbol leftOtherColumnSymbol = pb.symbol(LEFT_OTHER_COLUMN, BIGINT); - Symbol rightOtherColumnSymbol = pb.symbol(RIGHT_OTHER_COLUMN, DOUBLE); - return pb - .join( - INNER, - pb.values(leftJoinColumnSymbol, leftOtherColumnSymbol), - pb.values(rightJoinColumnSymbol, rightOtherColumnSymbol), - new EquiJoinClause(leftJoinColumnSymbol, rightJoinColumnSymbol), new EquiJoinClause(leftJoinColumnSymbol, rightJoinColumnSymbol)); - }).withSourceStats(0, LEFT_STATS) + tester().assertStatsFor( + testSessionBuilder().setSystemProperty(JOIN_MULTI_CLAUSE_INDEPENDENCE_FACTOR, "0.5").build(), + pb -> { + Symbol leftJoinColumnSymbol = pb.symbol(LEFT_JOIN_COLUMN, BIGINT); + Symbol rightJoinColumnSymbol = pb.symbol(RIGHT_JOIN_COLUMN, DOUBLE); + Symbol leftOtherColumnSymbol = pb.symbol(LEFT_OTHER_COLUMN, BIGINT); + Symbol rightOtherColumnSymbol = pb.symbol(RIGHT_OTHER_COLUMN, DOUBLE); + return pb.join( + INNER, + pb.values(leftJoinColumnSymbol, leftOtherColumnSymbol), + pb.values(rightJoinColumnSymbol, rightOtherColumnSymbol), + new EquiJoinClause(leftJoinColumnSymbol, rightJoinColumnSymbol), new EquiJoinClause(leftJoinColumnSymbol, rightJoinColumnSymbol)); + }) + .withSourceStats(0, LEFT_STATS) .withSourceStats(1, RIGHT_STATS) .check(stats -> stats.equalTo(innerJoinStats)); } @@ -141,60 +148,78 @@ public void testStatsForInnerJoinWithRepeatedClause() @Test public void testStatsForInnerJoinWithTwoEquiClauses() { - double innerJoinRowCount = - LEFT_ROWS_COUNT * RIGHT_ROWS_COUNT / LEFT_JOIN_COLUMN_2_NDV * LEFT_JOIN_COLUMN_2_NON_NULLS * RIGHT_JOIN_COLUMN_2_NON_NULLS // driver join clause - * UNKNOWN_FILTER_COEFFICIENT; // auxiliary join clause - PlanNodeStatsEstimate innerJoinStats = planNodeStats(innerJoinRowCount, + double crossJoinRowCount = LEFT_ROWS_COUNT * RIGHT_ROWS_COUNT; + PlanNodeStatsEstimate innerJoinStats = planNodeStats(crossJoinRowCount, symbolStatistics(LEFT_JOIN_COLUMN, 5.0, 20.0, 0.0, RIGHT_JOIN_COLUMN_NDV), symbolStatistics(RIGHT_JOIN_COLUMN, 5.0, 20.0, 0.0, RIGHT_JOIN_COLUMN_NDV), symbolStatistics(LEFT_JOIN_COLUMN_2, 100.0, 200.0, 0.0, RIGHT_JOIN_COLUMN_2_NDV), symbolStatistics(RIGHT_JOIN_COLUMN_2, 100.0, 200.0, 0.0, RIGHT_JOIN_COLUMN_2_NDV)); - tester().assertStatsFor(pb -> { + Function planProvider = pb -> { Symbol leftJoinColumnSymbol = pb.symbol(LEFT_JOIN_COLUMN, BIGINT); Symbol rightJoinColumnSymbol = pb.symbol(RIGHT_JOIN_COLUMN, DOUBLE); Symbol leftJoinColumnSymbol2 = pb.symbol(LEFT_JOIN_COLUMN_2, BIGINT); Symbol rightJoinColumnSymbol2 = pb.symbol(RIGHT_JOIN_COLUMN_2, DOUBLE); - return pb - .join( - INNER, - pb.values(leftJoinColumnSymbol, leftJoinColumnSymbol2), - pb.values(rightJoinColumnSymbol, rightJoinColumnSymbol2), - new EquiJoinClause(leftJoinColumnSymbol2, rightJoinColumnSymbol2), new EquiJoinClause(leftJoinColumnSymbol, rightJoinColumnSymbol)); - }).withSourceStats(0, planNodeStats(LEFT_ROWS_COUNT, LEFT_JOIN_COLUMN_STATS, LEFT_JOIN_COLUMN_2_STATS)) + return pb.join( + INNER, + pb.values(leftJoinColumnSymbol, leftJoinColumnSymbol2), + pb.values(rightJoinColumnSymbol, rightJoinColumnSymbol2), + new EquiJoinClause(leftJoinColumnSymbol2, rightJoinColumnSymbol2), new EquiJoinClause(leftJoinColumnSymbol, rightJoinColumnSymbol)); + }; + + // LEFT_JOIN_COLUMN_2 = RIGHT_JOIN_COLUMN_2 is the more selective clause + double firstClauseSelectivity = 1.0 / LEFT_JOIN_COLUMN_2_NDV * LEFT_JOIN_COLUMN_2_NON_NULLS * RIGHT_JOIN_COLUMN_2_NON_NULLS; + tester().assertStatsFor(testSessionBuilder().setSystemProperty(JOIN_MULTI_CLAUSE_INDEPENDENCE_FACTOR, "0").build(), planProvider) + .withSourceStats(0, planNodeStats(LEFT_ROWS_COUNT, LEFT_JOIN_COLUMN_STATS, LEFT_JOIN_COLUMN_2_STATS)) .withSourceStats(1, planNodeStats(RIGHT_ROWS_COUNT, RIGHT_JOIN_COLUMN_STATS, RIGHT_JOIN_COLUMN_2_STATS)) - .check(stats -> stats.equalTo(innerJoinStats)); + .check(stats -> stats.equalTo(innerJoinStats.mapOutputRowCount(rowCount -> rowCount * firstClauseSelectivity))); + + double secondClauseSelectivity = 1.0 / LEFT_JOIN_COLUMN_NDV * LEFT_JOIN_COLUMN_NON_NULLS * RIGHT_JOIN_COLUMN_NON_NULLS; + tester().assertStatsFor(testSessionBuilder().setSystemProperty(JOIN_MULTI_CLAUSE_INDEPENDENCE_FACTOR, "1").build(), planProvider) + .withSourceStats(0, planNodeStats(LEFT_ROWS_COUNT, LEFT_JOIN_COLUMN_STATS, LEFT_JOIN_COLUMN_2_STATS)) + .withSourceStats(1, planNodeStats(RIGHT_ROWS_COUNT, RIGHT_JOIN_COLUMN_STATS, RIGHT_JOIN_COLUMN_2_STATS)) + .check(stats -> stats.equalTo(innerJoinStats.mapOutputRowCount(rowCount -> rowCount * firstClauseSelectivity * secondClauseSelectivity))); + + tester().assertStatsFor(testSessionBuilder().setSystemProperty(JOIN_MULTI_CLAUSE_INDEPENDENCE_FACTOR, "0.5").build(), planProvider) + .withSourceStats(0, planNodeStats(LEFT_ROWS_COUNT, LEFT_JOIN_COLUMN_STATS, LEFT_JOIN_COLUMN_2_STATS)) + .withSourceStats(1, planNodeStats(RIGHT_ROWS_COUNT, RIGHT_JOIN_COLUMN_STATS, RIGHT_JOIN_COLUMN_2_STATS)) + .check(stats -> stats.equalTo(innerJoinStats.mapOutputRowCount( + rowCount -> rowCount * firstClauseSelectivity * Math.pow(secondClauseSelectivity, 0.5)))); } @Test public void testStatsForInnerJoinWithTwoEquiClausesAndNonEqualityFunction() { - double innerJoinRowCount = - LEFT_ROWS_COUNT * RIGHT_ROWS_COUNT / LEFT_JOIN_COLUMN_2_NDV * LEFT_JOIN_COLUMN_2_NON_NULLS * RIGHT_JOIN_COLUMN_2_NON_NULLS // driver join clause - * UNKNOWN_FILTER_COEFFICIENT // auxiliary join clause - * 0.3333333333; // LEFT_JOIN_COLUMN < 10 non equality filter + // LEFT_JOIN_COLUMN_2 = RIGHT_JOIN_COLUMN_2 is the more selective clause + double firstClauseSelectivity = 1.0 / LEFT_JOIN_COLUMN_2_NDV * LEFT_JOIN_COLUMN_2_NON_NULLS * RIGHT_JOIN_COLUMN_2_NON_NULLS; + double secondClauseSelectivity = 1.0 / LEFT_JOIN_COLUMN_NDV * LEFT_JOIN_COLUMN_NON_NULLS * RIGHT_JOIN_COLUMN_NON_NULLS; + double innerJoinRowCount = LEFT_ROWS_COUNT * RIGHT_ROWS_COUNT * firstClauseSelectivity + * Math.pow(secondClauseSelectivity, 0.5) + * 0.3333333333; // LEFT_JOIN_COLUMN < 10 non equality filter PlanNodeStatsEstimate innerJoinStats = planNodeStats(innerJoinRowCount, symbolStatistics(LEFT_JOIN_COLUMN, 5.0, 10.0, 0.0, RIGHT_JOIN_COLUMN_NDV * 0.3333333333), symbolStatistics(RIGHT_JOIN_COLUMN, 5.0, 20.0, 0.0, RIGHT_JOIN_COLUMN_NDV), symbolStatistics(LEFT_JOIN_COLUMN_2, 100.0, 200.0, 0.0, RIGHT_JOIN_COLUMN_2_NDV), symbolStatistics(RIGHT_JOIN_COLUMN_2, 100.0, 200.0, 0.0, RIGHT_JOIN_COLUMN_2_NDV)); - tester().assertStatsFor(pb -> { - Symbol leftJoinColumnSymbol = pb.symbol(LEFT_JOIN_COLUMN, BIGINT); - Symbol rightJoinColumnSymbol = pb.symbol(RIGHT_JOIN_COLUMN, DOUBLE); - Symbol leftJoinColumnSymbol2 = pb.symbol(LEFT_JOIN_COLUMN_2, BIGINT); - Symbol rightJoinColumnSymbol2 = pb.symbol(RIGHT_JOIN_COLUMN_2, DOUBLE); - ComparisonExpression leftJoinColumnLessThanTen = new ComparisonExpression(ComparisonExpression.Operator.LESS_THAN, leftJoinColumnSymbol.toSymbolReference(), new LongLiteral("10")); - return pb - .join( - INNER, - pb.values(leftJoinColumnSymbol, leftJoinColumnSymbol2), - pb.values(rightJoinColumnSymbol, rightJoinColumnSymbol2), - ImmutableList.of(new EquiJoinClause(leftJoinColumnSymbol2, rightJoinColumnSymbol2), new EquiJoinClause(leftJoinColumnSymbol, rightJoinColumnSymbol)), - ImmutableList.of(leftJoinColumnSymbol, leftJoinColumnSymbol2), - ImmutableList.of(rightJoinColumnSymbol, rightJoinColumnSymbol2), - Optional.of(leftJoinColumnLessThanTen)); - }).withSourceStats(0, planNodeStats(LEFT_ROWS_COUNT, LEFT_JOIN_COLUMN_STATS, LEFT_JOIN_COLUMN_2_STATS)) + tester().assertStatsFor( + testSessionBuilder().setSystemProperty(JOIN_MULTI_CLAUSE_INDEPENDENCE_FACTOR, "0.5").build(), + pb -> { + Symbol leftJoinColumnSymbol = pb.symbol(LEFT_JOIN_COLUMN, BIGINT); + Symbol rightJoinColumnSymbol = pb.symbol(RIGHT_JOIN_COLUMN, DOUBLE); + Symbol leftJoinColumnSymbol2 = pb.symbol(LEFT_JOIN_COLUMN_2, BIGINT); + Symbol rightJoinColumnSymbol2 = pb.symbol(RIGHT_JOIN_COLUMN_2, DOUBLE); + ComparisonExpression leftJoinColumnLessThanTen = new ComparisonExpression(ComparisonExpression.Operator.LESS_THAN, leftJoinColumnSymbol.toSymbolReference(), new LongLiteral("10")); + return pb.join( + INNER, + pb.values(leftJoinColumnSymbol, leftJoinColumnSymbol2), + pb.values(rightJoinColumnSymbol, rightJoinColumnSymbol2), + ImmutableList.of(new EquiJoinClause(leftJoinColumnSymbol2, rightJoinColumnSymbol2), new EquiJoinClause(leftJoinColumnSymbol, rightJoinColumnSymbol)), + ImmutableList.of(leftJoinColumnSymbol, leftJoinColumnSymbol2), + ImmutableList.of(rightJoinColumnSymbol, rightJoinColumnSymbol2), + Optional.of(leftJoinColumnLessThanTen)); + }) + .withSourceStats(0, planNodeStats(LEFT_ROWS_COUNT, LEFT_JOIN_COLUMN_STATS, LEFT_JOIN_COLUMN_2_STATS)) .withSourceStats(1, planNodeStats(RIGHT_ROWS_COUNT, RIGHT_JOIN_COLUMN_STATS, RIGHT_JOIN_COLUMN_2_STATS)) .check(stats -> stats.equalTo(innerJoinStats)); } @@ -336,6 +361,34 @@ public void testAddJoinComplementStats() .equalTo(addedStats); } + @Test + public void testUnknownInputStats() + { + assertJoinStats(INNER, PlanNodeStatsEstimate.unknown(), RIGHT_STATS, PlanNodeStatsEstimate.unknown()); + assertJoinStats(INNER, LEFT_STATS, PlanNodeStatsEstimate.unknown(), PlanNodeStatsEstimate.unknown()); + assertJoinStats(INNER, PlanNodeStatsEstimate.unknown(), PlanNodeStatsEstimate.unknown(), PlanNodeStatsEstimate.unknown()); + } + + @Test + public void testZeroInputStats() + { + PlanNodeStatsEstimate zeroLeftStats = planNodeStats(0, + new SymbolStatistics(LEFT_JOIN_COLUMN, SymbolStatsEstimate.zero()), + new SymbolStatistics(LEFT_OTHER_COLUMN, SymbolStatsEstimate.zero())); + PlanNodeStatsEstimate zeroRightStats = planNodeStats(0, + new SymbolStatistics(RIGHT_JOIN_COLUMN, SymbolStatsEstimate.zero()), + new SymbolStatistics(RIGHT_OTHER_COLUMN, SymbolStatsEstimate.zero())); + PlanNodeStatsEstimate zeroResultStats = planNodeStats(0, + new SymbolStatistics(LEFT_JOIN_COLUMN, SymbolStatsEstimate.zero()), + new SymbolStatistics(LEFT_OTHER_COLUMN, SymbolStatsEstimate.zero()), + new SymbolStatistics(RIGHT_JOIN_COLUMN, SymbolStatsEstimate.zero()), + new SymbolStatistics(RIGHT_OTHER_COLUMN, SymbolStatsEstimate.zero())); + + assertJoinStats(INNER, zeroLeftStats, RIGHT_STATS, zeroResultStats); + assertJoinStats(INNER, LEFT_STATS, zeroRightStats, zeroResultStats); + assertJoinStats(INNER, zeroLeftStats, zeroRightStats, zeroResultStats); + } + private void assertJoinStats(JoinNode.Type joinType, PlanNodeStatsEstimate leftStats, PlanNodeStatsEstimate rightStats, PlanNodeStatsEstimate resultStats) { assertJoinStats(joinType, LEFT_JOIN_COLUMN, LEFT_OTHER_COLUMN, RIGHT_JOIN_COLUMN, RIGHT_OTHER_COLUMN, leftStats, rightStats, resultStats); diff --git a/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java b/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java index c881b8424a29..70e460502dec 100644 --- a/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java +++ b/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java @@ -44,6 +44,7 @@ public void testDefaults() .setNetworkCostWeight(15) .setJoinMaxBroadcastTableSize(DataSize.of(100, MEGABYTE)) .setJoinDistributionType(JoinDistributionType.AUTOMATIC) + .setJoinMultiClauseIndependenceFactor(0.25) .setJoinReorderingStrategy(JoinReorderingStrategy.AUTOMATIC) .setMaxReorderedJoins(9) .setDistributedIndexJoinsEnabled(false) @@ -56,6 +57,7 @@ public void testDefaults() .setCollectPlanStatisticsForAllQueries(false) .setIgnoreStatsCalculatorFailures(true) .setDefaultFilterFactorEnabled(false) + .setFilterConjunctionIndependenceFactor(0.75) .setOptimizeMetadataQueries(false) .setOptimizeHashGeneration(true) .setPushTableWriteThroughUnion(true) @@ -95,8 +97,10 @@ public void testExplicitPropertyMappings() .put("collect-plan-statistics-for-all-queries", "true") .put("optimizer.ignore-stats-calculator-failures", "false") .put("optimizer.default-filter-factor-enabled", "true") + .put("optimizer.filter-conjunction-independence-factor", "1.0") .put("join-distribution-type", "BROADCAST") .put("join-max-broadcast-table-size", "42GB") + .put("optimizer.join-multi-clause-independence-factor", "0.75") .put("optimizer.join-reordering-strategy", "NONE") .put("optimizer.max-reordered-joins", "5") .put("iterative-optimizer-timeout", "10s") @@ -141,6 +145,7 @@ public void testExplicitPropertyMappings() .setIgnoreStatsCalculatorFailures(false) .setJoinDistributionType(BROADCAST) .setJoinMaxBroadcastTableSize(DataSize.of(42, GIGABYTE)) + .setJoinMultiClauseIndependenceFactor(0.75) .setJoinReorderingStrategy(NONE) .setMaxReorderedJoins(5) .setIterativeOptimizerTimeout(new Duration(10, SECONDS)) @@ -151,6 +156,7 @@ public void testExplicitPropertyMappings() .setUsePreferredWritePartitioning(false) .setPreferredWritePartitioningMinNumberOfPartitions(10) .setDefaultFilterFactorEnabled(true) + .setFilterConjunctionIndependenceFactor(1.0) .setOptimizeMetadataQueries(true) .setOptimizeHashGeneration(false) .setOptimizeMixedDistinctAggregations(true) diff --git a/docs/src/main/sphinx/admin/properties-optimizer.rst b/docs/src/main/sphinx/admin/properties-optimizer.rst index 974b81a362db..8139bd3ca723 100644 --- a/docs/src/main/sphinx/admin/properties-optimizer.rst +++ b/docs/src/main/sphinx/admin/properties-optimizer.rst @@ -146,3 +146,33 @@ Specifies minimal bucket to task ratio that has to be matched or exceeded in ord to use table scan node partitioning. When the table bucket count is small compared to the number of workers, then the table scan is distributed across all workers for improved parallelism. + +``optimizer.filter-conjunction-independence-factor`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** :ref:`prop-type-double` +* **Default value:** ``0.75`` +* **Min allowed value:** ``0`` +* **Max allowed value:** ``1`` + +Scales the strength of independence assumption for estimating the selectivity of +the conjunction of multiple predicates. Lower values for this property will produce +more conservative estimates by assuming a greater degree of correlation between the +columns of the predicates in a conjunction. A value of ``0`` results in the +optimizer assuming that the columns of the predicates are fully correlated and only +the most selective predicate drives the selectivity of a conjunction of predicates. + +``optimizer.join-multi-clause-independence-factor`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** :ref:`prop-type-double` +* **Default value:** ``0.25`` +* **Min allowed value:** ``0`` +* **Max allowed value:** ``1`` + +Scales the strength of independence assumption for estimating the output of a +multi-clause join. Lower values for this property will produce more +conservative estimates by assuming a greater degree of correlation between the +columns of the clauses in a join. A value of ``0`` results in the optimizer +assuming that the columns of the join clauses are fully correlated and only +the most selective clause drives the selectivity of the join. diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestShowStats.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestShowStats.java index 539263829a71..8f2222b668f6 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestShowStats.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestShowStats.java @@ -704,11 +704,11 @@ public void testShowStatsWithView() assertQuery( "SHOW STATS FOR (SELECT * FROM nation_view WHERE regionkey = 0)", "VALUES " + - " ('nationkey', null, 1, 0, null, 0, 24), " + - " ('name', 7.08, 1, 0, null, null, null), " + - " ('comment', 74.28, 1, 0, null, null, null), " + - " ('regionkey', null, 1, 0, null, 0, 0), " + - " (null, null, null, null, 1, null, null)"); + " ('nationkey', null, 0.29906975624424414, 0, null, 0, 24), " + + " ('name', 2.1174138742092485, 0.29906975624424414, 0, null, null, null), " + + " ('comment', 22.214901493822456, 0.29906975624424414, 0, null, null, null), " + + " ('regionkey', null, 0.29906975624424414, 0, null, 0, 0), " + + " (null, null, null, null, 0.29906975624424414, null, null)"); assertUpdate("DROP VIEW nation_view"); } diff --git a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q17.plan.txt b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q17.plan.txt index 17e8c737bf31..49c98c0b8dda 100644 --- a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q17.plan.txt +++ b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q17.plan.txt @@ -5,33 +5,33 @@ local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, ["i_item_desc", "i_item_id", "s_state"]) partial aggregation over (i_item_desc, i_item_id, s_state) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_customer_sk", "ss_item_sk"]) - join (INNER, REPLICATED): + remote exchange (REPARTITION, HASH, ["i_item_sk"]) + scan item + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["cs_item_sk"]) join (INNER, REPLICATED): - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_customer_sk", "ss_item_sk", "ss_ticket_number"]) - join (INNER, REPLICATED): - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan date_dim + join (INNER, REPLICATED): + join (INNER, REPLICATED): + scan catalog_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, ["ss_customer_sk", "ss_item_sk", "ss_ticket_number"]) + join (INNER, REPLICATED): + scan store_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["sr_customer_sk", "sr_item_sk", "sr_ticket_number"]) + join (INNER, REPLICATED): + scan store_returns + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["sr_customer_sk", "sr_item_sk", "sr_ticket_number"]) - join (INNER, REPLICATED): - scan store_returns - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan date_dim + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) scan store - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan item - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["cs_bill_customer_sk", "cs_item_sk"]) - join (INNER, REPLICATED): - scan catalog_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan date_dim diff --git a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q19.plan.txt b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q19.plan.txt index c500acf6fca3..5b7f87c8d34c 100644 --- a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q19.plan.txt +++ b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q19.plan.txt @@ -6,24 +6,24 @@ local exchange (GATHER, SINGLE, []) partial aggregation over (i_brand, i_brand_id, i_manufact, i_manufact_id) join (INNER, REPLICATED): join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ca_address_sk"]) - scan customer_address - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["c_current_addr_sk"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["c_customer_sk"]) - scan customer - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["ss_customer_sk"]) + remote exchange (REPARTITION, HASH, ["c_current_addr_sk"]) + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, ["c_customer_sk"]) + scan customer + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["ss_customer_sk"]) + join (INNER, REPLICATED): join (INNER, REPLICATED): - join (INNER, REPLICATED): - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan date_dim + scan store_sales local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) scan item + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["ca_address_sk"]) + scan customer_address local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) scan store diff --git a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q24.plan.txt b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q24.plan.txt index b7ad65ef0335..6d9dec2fe505 100644 --- a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q24.plan.txt +++ b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q24.plan.txt @@ -11,29 +11,29 @@ remote exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, ["c_first_name", "c_last_name", "ca_state", "i_color", "i_current_price", "i_manager_id", "i_size", "i_units", "s_state", "s_store_name"]) partial aggregation over (c_first_name, c_last_name, ca_state, i_color, i_current_price, i_manager_id, i_size, i_units, s_state, s_store_name) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["c_birth_country", "s_zip"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_customer_sk"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["sr_item_sk", "sr_ticket_number"]) - scan store_returns - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["ss_item_sk", "ss_ticket_number"]) - join (INNER, REPLICATED): - join (INNER, REPLICATED): - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan item - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan store - local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["ca_zip", "upper"]) + scan customer_address + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["c_birth_country", "s_zip"]) + join (INNER, PARTITIONED): remote exchange (REPARTITION, HASH, ["c_customer_sk"]) scan customer - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["ca_zip", "upper"]) - scan customer_address + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["ss_customer_sk"]) + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, ["sr_item_sk", "sr_ticket_number"]) + scan store_returns + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["ss_item_sk", "ss_ticket_number"]) + join (INNER, REPLICATED): + join (INNER, REPLICATED): + scan store_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan item + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan store local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) final aggregation over () diff --git a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q25.plan.txt b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q25.plan.txt index b1ead3c1672a..6e2928104027 100644 --- a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q25.plan.txt +++ b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q25.plan.txt @@ -5,33 +5,33 @@ local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, ["i_item_desc", "i_item_id", "s_store_id", "s_store_name"]) partial aggregation over (i_item_desc, i_item_id, s_store_id, s_store_name) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_customer_sk", "ss_item_sk"]) + remote exchange (REPARTITION, HASH, ["cs_item_sk"]) join (INNER, REPLICATED): join (INNER, REPLICATED): - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_customer_sk", "ss_item_sk", "ss_ticket_number"]) - join (INNER, REPLICATED): - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan date_dim + join (INNER, REPLICATED): + scan catalog_sales local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["sr_customer_sk", "sr_item_sk", "sr_ticket_number"]) - join (INNER, REPLICATED): - scan store_returns + remote exchange (REPLICATE, BROADCAST, []) + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, ["ss_customer_sk", "ss_item_sk", "ss_ticket_number"]) + join (INNER, REPLICATED): + scan store_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan date_dim + remote exchange (REPARTITION, HASH, ["sr_customer_sk", "sr_item_sk", "sr_ticket_number"]) + join (INNER, REPLICATED): + scan store_returns + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) - scan store + scan date_dim local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) - scan item + scan store local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["cs_bill_customer_sk", "cs_item_sk"]) - join (INNER, REPLICATED): - scan catalog_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan date_dim + remote exchange (REPARTITION, HASH, ["i_item_sk"]) + scan item diff --git a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q29.plan.txt b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q29.plan.txt index 11d8ac74f4a7..6e2928104027 100644 --- a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q29.plan.txt +++ b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q29.plan.txt @@ -5,33 +5,33 @@ local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, ["i_item_desc", "i_item_id", "s_store_id", "s_store_name"]) partial aggregation over (i_item_desc, i_item_id, s_store_id, s_store_name) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["cs_bill_customer_sk", "cs_item_sk"]) + remote exchange (REPARTITION, HASH, ["cs_item_sk"]) join (INNER, REPLICATED): - scan catalog_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan date_dim - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["ss_customer_sk", "ss_item_sk"]) join (INNER, REPLICATED): join (INNER, REPLICATED): - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ss_customer_sk", "ss_item_sk", "ss_ticket_number"]) - join (INNER, REPLICATED): - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan date_dim - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["sr_customer_sk", "sr_item_sk", "sr_ticket_number"]) - join (INNER, REPLICATED): - scan store_returns - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan date_dim + scan catalog_sales local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) - scan store + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, ["ss_customer_sk", "ss_item_sk", "ss_ticket_number"]) + join (INNER, REPLICATED): + scan store_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["sr_customer_sk", "sr_item_sk", "sr_ticket_number"]) + join (INNER, REPLICATED): + scan store_returns + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) - scan item + scan date_dim + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan store + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["i_item_sk"]) + scan item diff --git a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q49.plan.txt b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q49.plan.txt index 278d8dae514d..3ac14bb8f136 100644 --- a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q49.plan.txt +++ b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q49.plan.txt @@ -38,21 +38,21 @@ local exchange (GATHER, SINGLE, []) local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) scan date_dim - remote exchange (REPARTITION, HASH, ["expr_104", "expr_99", "rank_101", "rank_102", "sr_item_sk"]) - partial aggregation over (expr_104, expr_99, rank_101, rank_102, sr_item_sk) + remote exchange (REPARTITION, HASH, ["expr_104", "expr_99", "rank_101", "rank_102", "ss_item_sk"]) + partial aggregation over (expr_104, expr_99, rank_101, rank_102, ss_item_sk) local exchange (GATHER, SINGLE, []) remote exchange (GATHER, SINGLE, []) - final aggregation over (sr_item_sk) + final aggregation over (ss_item_sk) local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["sr_item_sk"]) - partial aggregation over (sr_item_sk) + remote exchange (REPARTITION, HASH, ["ss_item_sk"]) + partial aggregation over (ss_item_sk) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["sr_item_sk", "sr_ticket_number"]) - scan store_returns + remote exchange (REPARTITION, HASH, ["ss_item_sk", "ss_ticket_number"]) + join (INNER, REPLICATED): + scan store_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["ss_item_sk", "ss_ticket_number"]) - join (INNER, REPLICATED): - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan date_dim + remote exchange (REPARTITION, HASH, ["sr_item_sk", "sr_ticket_number"]) + scan store_returns diff --git a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q52.plan.txt b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q52.plan.txt index 33752e693e6c..70aae3fa1bdd 100644 --- a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q52.plan.txt +++ b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q52.plan.txt @@ -9,7 +9,7 @@ local exchange (GATHER, SINGLE, []) scan store_sales local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) - scan date_dim + scan item local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) - scan item + scan date_dim diff --git a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q54.plan.txt b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q54.plan.txt index 697089f8186c..4319fc02faf9 100644 --- a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q54.plan.txt +++ b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q54.plan.txt @@ -15,33 +15,34 @@ local exchange (GATHER, SINGLE, []) scan store_sales local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) - join (INNER, REPLICATED): - join (INNER, REPLICATED): - scan customer_address - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - final aggregation over (c_current_addr_sk, c_customer_sk) - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["c_current_addr_sk", "c_customer_sk"]) - partial aggregation over (c_current_addr_sk, c_customer_sk) - join (INNER, REPLICATED, can skip output duplicates): - scan customer - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, ["ca_address_sk"]) + join (INNER, REPLICATED): + scan customer_address + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan store + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["c_current_addr_sk"]) + final aggregation over (c_current_addr_sk, c_customer_sk) + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["c_current_addr_sk", "c_customer_sk"]) + partial aggregation over (c_current_addr_sk, c_customer_sk) + join (INNER, REPLICATED, can skip output duplicates): + scan customer + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + join (INNER, REPLICATED, can skip output duplicates): join (INNER, REPLICATED, can skip output duplicates): - join (INNER, REPLICATED, can skip output duplicates): - remote exchange (REPARTITION, ROUND_ROBIN, []) - scan catalog_sales - scan web_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan item + remote exchange (REPARTITION, ROUND_ROBIN, []) + scan catalog_sales + scan web_sales local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) - scan date_dim - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan store + scan item + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) scan date_dim diff --git a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q55.plan.txt b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q55.plan.txt index 0322599f01e5..f89f2f078b63 100644 --- a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q55.plan.txt +++ b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q55.plan.txt @@ -9,7 +9,7 @@ local exchange (GATHER, SINGLE, []) scan store_sales local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) - scan date_dim + scan item local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) - scan item + scan date_dim diff --git a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q59.plan.txt b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q59.plan.txt index 632c33348697..31a3f601598f 100644 --- a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q59.plan.txt +++ b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q59.plan.txt @@ -1,40 +1,41 @@ local exchange (GATHER, SINGLE, []) remote exchange (GATHER, SINGLE, []) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["d_week_seq", "s_store_id"]) - join (INNER, REPLICATED): - join (INNER, REPLICATED): - final aggregation over (d_week_seq, ss_store_sk) - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["d_week_seq", "ss_store_sk"]) - partial aggregation over (d_week_seq, ss_store_sk) - join (INNER, REPLICATED): - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan date_dim - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan date_dim - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan store + final aggregation over (d_week_seq, ss_store_sk) + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["d_week_seq", "ss_store_sk"]) + partial aggregation over (d_week_seq, ss_store_sk) + join (INNER, REPLICATED): + scan store_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["expr_171", "s_store_id_108"]) - join (INNER, REPLICATED): - join (INNER, REPLICATED): - final aggregation over (d_week_seq_67, ss_store_sk_47) + remote exchange (REPARTITION, HASH, ["expr_171", "s_store_sk"]) + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, ["ss_store_sk_47"]) + join (INNER, REPLICATED): + join (INNER, REPLICATED): + final aggregation over (d_week_seq_67, ss_store_sk_47) + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["d_week_seq_67", "ss_store_sk_47"]) + partial aggregation over (d_week_seq_67, ss_store_sk_47) + join (INNER, REPLICATED): + scan store_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["d_week_seq_67", "ss_store_sk_47"]) - partial aggregation over (d_week_seq_67, ss_store_sk_47) - join (INNER, REPLICATED): - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan date_dim - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan date_dim + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan store + remote exchange (REPARTITION, HASH, ["s_store_sk_107"]) + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, ["s_store_id"]) + scan store + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["s_store_id_108"]) + scan store diff --git a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q68.plan.txt b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q68.plan.txt index 2ef8a760155b..46b2fc2f8db8 100644 --- a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q68.plan.txt +++ b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q68.plan.txt @@ -7,27 +7,27 @@ local exchange (GATHER, SINGLE, []) scan customer local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, ["ss_customer_sk"]) - final aggregation over (ca_address_sk, ca_city, ss_customer_sk, ss_ticket_number) + final aggregation over (ca_city, ss_addr_sk, ss_customer_sk, ss_ticket_number) local exchange (GATHER, SINGLE, []) - partial aggregation over (ca_address_sk, ca_city, ss_customer_sk, ss_ticket_number) + partial aggregation over (ca_city, ss_addr_sk, ss_customer_sk, ss_ticket_number) join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ca_address_sk"]) - scan customer_address - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["ss_addr_sk"]) + remote exchange (REPARTITION, HASH, ["ss_addr_sk"]) + join (INNER, REPLICATED): join (INNER, REPLICATED): join (INNER, REPLICATED): - join (INNER, REPLICATED): - scan store_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan date_dim + scan store_sales local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) scan store local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) - scan household_demographics + scan date_dim + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan household_demographics + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["ca_address_sk"]) + scan customer_address local exchange (GATHER, SINGLE, []) remote exchange (REPARTITION, HASH, ["ca_address_sk_2"]) scan customer_address diff --git a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q85.plan.txt b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q85.plan.txt index 209c0a446935..23eed8958c96 100644 --- a/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q85.plan.txt +++ b/testing/trino-benchto-benchmarks/src/test/resources/sql/presto/tpcds/q85.plan.txt @@ -6,33 +6,32 @@ local exchange (GATHER, SINGLE, []) partial aggregation over (r_reason_desc) join (INNER, REPLICATED): join (INNER, REPLICATED): - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["cd_demo_sk_0", "cd_education_status_3", "cd_marital_status_2"]) - scan customer_demographics - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["cd_education_status", "cd_marital_status", "wr_returning_cdemo_sk"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["wr_refunded_addr_sk"]) - join (INNER, PARTITIONED): - remote exchange (REPARTITION, HASH, ["ws_item_sk", "ws_order_number"]) - join (INNER, REPLICATED): - scan web_sales - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan date_dim - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["wr_item_sk", "wr_order_number"]) - join (INNER, REPLICATED): - scan web_returns - local exchange (GATHER, SINGLE, []) - remote exchange (REPLICATE, BROADCAST, []) - scan customer_demographics - local exchange (GATHER, SINGLE, []) - remote exchange (REPARTITION, HASH, ["ca_address_sk"]) - scan customer_address + scan customer_demographics local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) - scan web_page + join (INNER, REPLICATED): + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, ["ca_address_sk"]) + scan customer_address + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["wr_refunded_addr_sk"]) + join (INNER, REPLICATED): + join (INNER, PARTITIONED): + remote exchange (REPARTITION, HASH, ["ws_item_sk", "ws_order_number"]) + join (INNER, REPLICATED): + scan web_sales + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan date_dim + local exchange (GATHER, SINGLE, []) + remote exchange (REPARTITION, HASH, ["wr_item_sk", "wr_order_number"]) + scan web_returns + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan customer_demographics + local exchange (GATHER, SINGLE, []) + remote exchange (REPLICATE, BROADCAST, []) + scan web_page local exchange (GATHER, SINGLE, []) remote exchange (REPLICATE, BROADCAST, []) scan reason