Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.benchmark;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.testing.LocalQueryRunner;
import com.google.common.collect.ImmutableMap;
import org.intellij.lang.annotations.Language;

import java.util.Map;

import static com.facebook.presto.SystemSessionProperties.EXPLOIT_CONSTRAINTS;
import static com.facebook.presto.SystemSessionProperties.IN_PREDICATES_AS_INNER_JOINS_ENABLED;
import static com.facebook.presto.SystemSessionProperties.PUSH_AGGREGATION_BELOW_JOIN_BYTE_REDUCTION_THRESHOLD;

public class SqlEarlyOutJoinsBenchmarks
extends AbstractSqlBenchmark
{
private static final Logger LOGGER = Logger.get(SqlEarlyOutJoinsBenchmarks.class);

private static Map<String, String> disableOptimization = ImmutableMap.of(IN_PREDICATES_AS_INNER_JOINS_ENABLED, Boolean.toString(false),
EXPLOIT_CONSTRAINTS, Boolean.toString(true));
private static Map<String, String> enableOptimization = ImmutableMap.of(IN_PREDICATES_AS_INNER_JOINS_ENABLED, Boolean.toString(true),
EXPLOIT_CONSTRAINTS, Boolean.toString(true));

public SqlEarlyOutJoinsBenchmarks(LocalQueryRunner localQueryRunner, @Language("SQL") String sql)
{
super(localQueryRunner, "early_out_joins", 10, 10, sql);
}

public static void main(String[] args)
{
benchmarkTransformDistinctInnerJoinToLeftEarlyOutJoin();
benchmarkTransformDistinctInnerJoinToRightEarlyOutJoin();
benchmarkRewriteOfInPredicateToDistinctInnerJoin();
}

private static void benchmarkTransformDistinctInnerJoinToLeftEarlyOutJoin()
{
LOGGER.info("benchmarkTransformDistinctInnerJoinToLeftEarlyOutJoin");
String sql = "select distinct orderkey from lineitem, nation where orderkey=nationkey";
LOGGER.info("Without optimization");
new SqlEarlyOutJoinsBenchmarks(BenchmarkQueryRunner.createLocalQueryRunner(disableOptimization), sql).runBenchmark(new SimpleLineBenchmarkResultWriter(System.out));
LOGGER.info("With optimization");
new SqlEarlyOutJoinsBenchmarks(BenchmarkQueryRunner.createLocalQueryRunner(enableOptimization), sql).runBenchmark(new SimpleLineBenchmarkResultWriter(System.out));
}

private static void benchmarkTransformDistinctInnerJoinToRightEarlyOutJoin()
{
LOGGER.info("benchmarkTransformDistinctInnerJoinToRightEarlyOutJoin");
String sql = "select distinct l.orderkey, l.comment from lineitem l, orders o where l.orderkey = o.orderkey";
LOGGER.info("Without optimization");
new SqlEarlyOutJoinsBenchmarks(BenchmarkQueryRunner.createLocalQueryRunner(disableOptimization), sql).runBenchmark(new SimpleLineBenchmarkResultWriter(System.out));
LOGGER.info("With optimization");
new SqlEarlyOutJoinsBenchmarks(BenchmarkQueryRunner.createLocalQueryRunner(enableOptimization), sql).runBenchmark(new SimpleLineBenchmarkResultWriter(System.out));
}

private static void benchmarkRewriteOfInPredicateToDistinctInnerJoin()
{
LOGGER.info("benchmarkInPredicateToDistinctInnerJoin");
LOGGER.info("Case 1: Rewrite IN predicate to distinct + inner join");
String sql = " explain select * from region where regionkey in (select orderkey from lineitem)";
LOGGER.info("Without optimization");
new SqlEarlyOutJoinsBenchmarks(BenchmarkQueryRunner.createLocalQueryRunner(disableOptimization), sql).runBenchmark(new SimpleLineBenchmarkResultWriter(System.out));
LOGGER.info("With optimization: case 1");
new SqlEarlyOutJoinsBenchmarks(BenchmarkQueryRunner.createLocalQueryRunner(enableOptimization), sql).runBenchmark(new SimpleLineBenchmarkResultWriter(System.out));

LOGGER.info("Case 2: Rewrite IN predicate to distinct + inner join and then push aggregation down into the probe of the join");
//Use same query as previous and change the byte reduction threshold
LOGGER.info("With optimization: case 2");
Map<String, String> alteredByteReductionThreshold = ImmutableMap.<String, String>builder()
.putAll(enableOptimization)
.put(PUSH_AGGREGATION_BELOW_JOIN_BYTE_REDUCTION_THRESHOLD, "0.001")
.build();
new SqlEarlyOutJoinsBenchmarks(BenchmarkQueryRunner.createLocalQueryRunner(enableOptimization), sql).runBenchmark(new SimpleLineBenchmarkResultWriter(System.out));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ public final class SystemSessionProperties
public static final String PUSH_REMOTE_EXCHANGE_THROUGH_GROUP_ID = "push_remote_exchange_through_group_id";
public static final String OPTIMIZE_MULTIPLE_APPROX_PERCENTILE_ON_SAME_FIELD = "optimize_multiple_approx_percentile_on_same_field";
public static final String RANDOMIZE_OUTER_JOIN_NULL_KEY = "randomize_outer_join_null_key";
public static final String IN_PREDICATES_AS_INNER_JOINS_ENABLED = "in_predicates_as_inner_joins_enabled";
public static final String PUSH_AGGREGATION_BELOW_JOIN_BYTE_REDUCTION_THRESHOLD = "push_aggregation_below_join_byte_reduction_threshold";
public static final String KEY_BASED_SAMPLING_ENABLED = "key_based_sampling_enabled";
public static final String KEY_BASED_SAMPLING_PERCENTAGE = "key_based_sampling_percentage";
public static final String KEY_BASED_SAMPLING_FUNCTION = "key_based_sampling_function";
Expand Down Expand Up @@ -1421,6 +1423,15 @@ public SystemSessionProperties(
REMOVE_REDUNDANT_DISTINCT_AGGREGATION_ENABLED,
"Enable removing distinct aggregation node if input is already distinct",
featuresConfig.isRemoveRedundantDistinctAggregationEnabled(),
false),
booleanProperty(IN_PREDICATES_AS_INNER_JOINS_ENABLED,
"Enable transformation of IN predicates to inner joins",
featuresConfig.isInPredicatesAsInnerJoinsEnabled(),
false),
doubleProperty(
PUSH_AGGREGATION_BELOW_JOIN_BYTE_REDUCTION_THRESHOLD,
"Byte reduction ratio threshold at which to disable pushdown of aggregation below inner join",
featuresConfig.getPushAggregationBelowJoinByteReductionThreshold(),
false));
}

Expand Down Expand Up @@ -2381,4 +2392,14 @@ public static boolean isRemoveRedundantDistinctAggregationEnabled(Session sessio
{
return session.getSystemProperty(REMOVE_REDUNDANT_DISTINCT_AGGREGATION_ENABLED, Boolean.class);
}

public static boolean isInPredicatesAsInnerJoinsEnabled(Session session)
{
return session.getSystemProperty(IN_PREDICATES_AS_INNER_JOINS_ENABLED, Boolean.class);
}

public static double getPushAggregationBelowJoinByteReductionThreshold(Session session)
{
return session.getSystemProperty(PUSH_AGGREGATION_BELOW_JOIN_BYTE_REDUCTION_THRESHOLD, Double.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ public class FeaturesConfig
private boolean randomizeOuterJoinNullKey;
private boolean isOptimizeConditionalAggregationEnabled;
private boolean isRemoveRedundantDistinctAggregationEnabled = true;
private boolean inPredicatesAsInnerJoinsEnabled;
private double pushAggregationBelowJoinByteReductionThreshold = 1;

public enum PartitioningPrecisionStrategy
{
Expand Down Expand Up @@ -2287,4 +2289,30 @@ public FeaturesConfig setRemoveRedundantDistinctAggregationEnabled(boolean isRem
this.isRemoveRedundantDistinctAggregationEnabled = isRemoveRedundantDistinctAggregationEnabled;
return this;
}

public boolean isInPredicatesAsInnerJoinsEnabled()
{
return inPredicatesAsInnerJoinsEnabled;
}

@Config("optimizer.in-predicates-as-inner-joins-enabled")
@ConfigDescription("Enable rewrite of In predicates to INNER joins")
public FeaturesConfig setInPredicatesAsInnerJoinsEnabled(boolean inPredicatesAsInnerJoinsEnabled)
{
this.inPredicatesAsInnerJoinsEnabled = inPredicatesAsInnerJoinsEnabled;
return this;
}

public double getPushAggregationBelowJoinByteReductionThreshold()
{
return pushAggregationBelowJoinByteReductionThreshold;
}

@Config("optimizer.push-aggregation-below-join-byte-reduction-threshold")
@ConfigDescription("Byte reduction ratio threshold at which to disable pushdown of aggregation below inner join")
public FeaturesConfig setPushAggregationBelowJoinByteReductionThreshold(double pushAggregationBelowJoinByteReductionThreshold)
{
this.pushAggregationBelowJoinByteReductionThreshold = pushAggregationBelowJoinByteReductionThreshold;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@
import com.facebook.presto.sql.planner.iterative.rule.TransformCorrelatedScalarAggregationToJoin;
import com.facebook.presto.sql.planner.iterative.rule.TransformCorrelatedScalarSubquery;
import com.facebook.presto.sql.planner.iterative.rule.TransformCorrelatedSingleRowSubqueryToProject;
import com.facebook.presto.sql.planner.iterative.rule.TransformDistinctInnerJoinToLeftEarlyOutJoin;
import com.facebook.presto.sql.planner.iterative.rule.TransformDistinctInnerJoinToRightEarlyOutJoin;
import com.facebook.presto.sql.planner.iterative.rule.TransformExistsApplyToLateralNode;
import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedInPredicateSubqueryToDistinctInnerJoin;
import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedInPredicateSubqueryToSemiJoin;
import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedLateralToJoin;
import com.facebook.presto.sql.planner.iterative.rule.TranslateExpressions;
Expand Down Expand Up @@ -395,6 +398,7 @@ public PlanOptimizers(
ImmutableSet.of(
new RemoveUnreferencedScalarLateralNodes(),
new TransformUncorrelatedLateralToJoin(),
new TransformUncorrelatedInPredicateSubqueryToDistinctInnerJoin(),
new TransformUncorrelatedInPredicateSubqueryToSemiJoin(),
new TransformCorrelatedScalarAggregationToJoin(metadata.getFunctionAndTypeManager()),
new TransformCorrelatedLateralJoinToJoin())),
Expand Down Expand Up @@ -599,6 +603,24 @@ public PlanOptimizers(
// We run it again to mark this for intermediate join nodes.
builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new HistoricalStatisticsEquivalentPlanMarkingOptimizer(statsCalculator)));

// Run this set of join transformations after ReorderJoins, but before DetermineJoinDistributionType
builder.add(new IterativeOptimizer(
ruleStats,
statsCalculator,
estimatedExchangesCostCalculator,
Optional.of(new LogicalPropertiesProviderImpl(new FunctionResolution(metadata.getFunctionAndTypeManager()))),
ImmutableSet.of(
new TransformDistinctInnerJoinToLeftEarlyOutJoin(),
new TransformDistinctInnerJoinToRightEarlyOutJoin(),
new RemoveRedundantDistinct(),
new RemoveRedundantTopN(),
new RemoveRedundantSort(),
new RemoveRedundantLimit(),
new RemoveRedundantDistinctLimit(),
new RemoveRedundantAggregateDistinct(),
new RemoveRedundantIdentityProjections(),
new PushAggregationThroughOuterJoin(metadata.getFunctionAndTypeManager()))));

builder.add(new IterativeOptimizer(
ruleStats,
statsCalculator,
Expand Down
Loading