Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.facebook.presto.SystemSessionProperties.HANDLE_COMPLEX_EQUI_JOINS;
import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static com.facebook.presto.SystemSessionProperties.JOIN_REORDERING_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.OPTIMIZE_JOINS_WITH_EMPTY_SOURCES;
Expand Down Expand Up @@ -54,7 +55,8 @@ public TestTpcdsCostBasedPlan()
.setSystemProperty("task_concurrency", "1") // these tests don't handle exchanges from local parallel
.setSystemProperty(JOIN_REORDERING_STRATEGY, JoinReorderingStrategy.AUTOMATIC.name())
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name())
.setSystemProperty(OPTIMIZE_JOINS_WITH_EMPTY_SOURCES, "false");
.setSystemProperty(OPTIMIZE_JOINS_WITH_EMPTY_SOURCES, "false")
.setSystemProperty(HANDLE_COMPLEX_EQUI_JOINS, "true");

LocalQueryRunner queryRunner = LocalQueryRunner.queryRunnerWithFakeNodeCountForStats(sessionBuilder.build(), 8);
queryRunner.createCatalog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.facebook.presto.SystemSessionProperties.HANDLE_COMPLEX_EQUI_JOINS;
import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static com.facebook.presto.SystemSessionProperties.JOIN_REORDERING_STRATEGY;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
Expand Down Expand Up @@ -54,7 +55,8 @@ public TestTpchCostBasedPlan()
.setSchema("sf3000.0")
.setSystemProperty("task_concurrency", "1") // these tests don't handle exchanges from local parallel
.setSystemProperty(JOIN_REORDERING_STRATEGY, JoinReorderingStrategy.AUTOMATIC.name())
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name());
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name())
.setSystemProperty(HANDLE_COMPLEX_EQUI_JOINS, "true");

LocalQueryRunner queryRunner = LocalQueryRunner.queryRunnerWithFakeNodeCountForStats(sessionBuilder.build(), 8);
queryRunner.createCatalog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,22 @@ remote exchange (GATHER, SINGLE, [])
local exchange (GATHER, UNKNOWN, [])
remote exchange (REPARTITION, ROUND_ROBIN, [])
join (INNER, PARTITIONED):
remote exchange (REPARTITION, HASH, [subtract_400])
join (INNER, PARTITIONED):
final aggregation over (d_week_seq_232)
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, [d_week_seq_232])
partial aggregation over (d_week_seq_232)
join (INNER, REPLICATED):
remote exchange (REPARTITION, ROUND_ROBIN, [])
scan web_sales
scan catalog_sales
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan date_dim
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, [d_week_seq_316])
scan date_dim
join (INNER, PARTITIONED):
final aggregation over (d_week_seq)
local exchange (GATHER, SINGLE, [])
Expand All @@ -17,20 +33,3 @@ remote exchange (GATHER, SINGLE, [])
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, [d_week_seq_83])
scan date_dim
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, [subtract])
join (INNER, PARTITIONED):
final aggregation over (d_week_seq_232)
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, [d_week_seq_232])
partial aggregation over (d_week_seq_232)
join (INNER, REPLICATED):
remote exchange (REPARTITION, ROUND_ROBIN, [])
scan web_sales
scan catalog_sales
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan date_dim
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, [d_week_seq_316])
scan date_dim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
local exchange (GATHER, SINGLE, [])
remote exchange (GATHER, SINGLE, [])
join (INNER, PARTITIONED):
remote exchange (REPARTITION, HASH, [d_week_seq, s_store_id])
remote exchange (REPARTITION, HASH, [d_week_seq, d_week_seq_267, s_store_id])
join (INNER, REPLICATED):
join (INNER, REPLICATED):
final aggregation over (d_week_seq, ss_store_sk)
Expand All @@ -20,7 +20,7 @@ local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan store
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, [s_store_id_235, subtract])
remote exchange (REPARTITION, HASH, [d_week_seq_147, d_week_seq_63, s_store_id_235])
join (INNER, REPLICATED):
join (INNER, REPLICATED):
final aggregation over (d_week_seq_147, ss_store_sk_127)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ public final class SystemSessionProperties
public static final String INFER_INEQUALITY_PREDICATES = "infer_inequality_predicates";
public static final String ENABLE_HISTORY_BASED_SCALED_WRITER = "enable_history_based_scaled_writer";
public static final String REMOVE_REDUNDANT_CAST_TO_VARCHAR_IN_JOIN = "remove_redundant_cast_to_varchar_in_join";
public static final String HANDLE_COMPLEX_EQUI_JOINS = "handle_complex_equi_joins";

// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
public static final String NATIVE_SIMPLIFIED_EXPRESSION_EVALUATION_ENABLED = "native_simplified_expression_evaluation_enabled";
Expand Down Expand Up @@ -1775,6 +1776,11 @@ public SystemSessionProperties(
REMOVE_REDUNDANT_CAST_TO_VARCHAR_IN_JOIN,
"If both left and right side of join clause are varchar cast from int/bigint, remove the cast here",
featuresConfig.isRemoveRedundantCastToVarcharInJoin(),
false),
booleanProperty(
HANDLE_COMPLEX_EQUI_JOINS,
"Handle complex equi-join conditions to open up join space for join reordering",
featuresConfig.getHandleComplexEquiJoins(),
false));
}

Expand Down Expand Up @@ -2959,4 +2965,9 @@ public static boolean isRemoveRedundantCastToVarcharInJoinEnabled(Session sessio
{
return session.getSystemProperty(REMOVE_REDUNDANT_CAST_TO_VARCHAR_IN_JOIN, Boolean.class);
}

public static boolean shouldHandleComplexEquiJoins(Session session)
{
return session.getSystemProperty(HANDLE_COMPLEX_EQUI_JOINS, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ public class FeaturesConfig
private boolean rewriteConstantArrayContainsToIn;

private boolean preProcessMetadataCalls;
private boolean handleComplexEquiJoins;
private boolean useHBOForScaledWriters;

private boolean removeRedundantCastToVarcharInJoin = true;
Expand Down Expand Up @@ -2831,4 +2832,17 @@ public FeaturesConfig setRemoveRedundantCastToVarcharInJoin(boolean removeRedund
this.removeRedundantCastToVarcharInJoin = removeRedundantCastToVarcharInJoin;
return this;
}

public boolean getHandleComplexEquiJoins()
{
return handleComplexEquiJoins;
}

@Config("optimizer.handle-complex-equi-joins")
@ConfigDescription("Handle complex equi-join conditions to open up join space for join reordering")
public FeaturesConfig setHandleComplexEquiJoins(boolean handleComplexEquiJoins)
{
this.handleComplexEquiJoins = handleComplexEquiJoins;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,16 @@ public int hashCode()
{
return Objects.hash(sources, type, criteria, filters, outputVariables);
}

@Override
public String toString()
{
return "CanonicalJoinNode{" +
"sources=" + sources +
", type=" + type +
", criteria=" + criteria +
", filters=" + filters +
", outputVariables=" + outputVariables +
'}';
}
}
Loading