Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ public final class SystemSessionProperties
public static final String REWRITE_CROSS_JOIN_OR_TO_INNER_JOIN = "rewrite_cross_join_or_to_inner_join";
public static final String REWRITE_CROSS_JOIN_ARRAY_CONTAINS_TO_INNER_JOIN = "rewrite_cross_join_array_contains_to_inner_join";
public static final String REWRITE_LEFT_JOIN_NULL_FILTER_TO_SEMI_JOIN = "rewrite_left_join_null_filter_to_semi_join";
public static final String USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN = "use_broadcast_when_buildsize_small_probeside_unknown";

// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
public static final String NATIVE_SIMPLIFIED_EXPRESSION_EVALUATION_ENABLED = "simplified_expression_evaluation_enabled";
Expand Down Expand Up @@ -1591,6 +1592,11 @@ public SystemSessionProperties(
REWRITE_LEFT_JOIN_NULL_FILTER_TO_SEMI_JOIN,
"Rewrite left join with is null check to semi join",
featuresConfig.isLeftJoinNullFilterToSemiJoin(),
false),
booleanProperty(
USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN,
"Experimental: When probe side size is unknown but build size is within broadcast limit, choose broadcast join",
featuresConfig.isBroadcastJoinWithSmallBuildUnknownProbe(),
false));
}

Expand Down Expand Up @@ -2674,4 +2680,9 @@ public static boolean isRewriteLeftJoinNullFilterToSemiJoinEnabled(Session sessi
{
return session.getSystemProperty(REWRITE_LEFT_JOIN_NULL_FILTER_TO_SEMI_JOIN, Boolean.class);
}

public static boolean isUseBroadcastJoinWhenBuildSizeSmallProbeSizeUnknownEnabled(Session session)
{
return session.getSystemProperty(USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ public class FeaturesConfig
private boolean rewriteCrossJoinWithArrayContainsFilterToInnerJoin = true;
private JoinNotNullInferenceStrategy joinNotNullInferenceStrategy = NONE;
private boolean leftJoinNullFilterToSemiJoin = true;
private boolean broadcastJoinWithSmallBuildUnknownProbe;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking of default to false, and roll out gradually.


private boolean preProcessMetadataCalls;

Expand Down Expand Up @@ -2640,4 +2641,17 @@ public FeaturesConfig setLeftJoinNullFilterToSemiJoin(boolean leftJoinNullFilter
this.leftJoinNullFilterToSemiJoin = leftJoinNullFilterToSemiJoin;
return this;
}

public boolean isBroadcastJoinWithSmallBuildUnknownProbe()
{
return this.broadcastJoinWithSmallBuildUnknownProbe;
}

@Config("experimental.optimizer.broadcast-join-with-small-build-unknown-probe")
@ConfigDescription("Experimental: When probe side size is unknown but build size is within broadcast limit, choose broadcast join")
public FeaturesConfig setBroadcastJoinWithSmallBuildUnknownProbe(boolean broadcastJoinWithSmallBuildUnknownProbe)
{
this.broadcastJoinWithSmallBuildUnknownProbe = broadcastJoinWithSmallBuildUnknownProbe;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static com.facebook.presto.SystemSessionProperties.getJoinDistributionType;
import static com.facebook.presto.SystemSessionProperties.getJoinMaxBroadcastTableSize;
import static com.facebook.presto.SystemSessionProperties.isSizeBasedJoinDistributionTypeEnabled;
import static com.facebook.presto.SystemSessionProperties.isUseBroadcastJoinWhenBuildSizeSmallProbeSizeUnknownEnabled;
import static com.facebook.presto.cost.CostCalculatorWithEstimatedExchanges.calculateJoinCostWithoutOutput;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.AUTOMATIC;
import static com.facebook.presto.sql.planner.iterative.rule.JoinSwappingUtils.isBelowBroadcastLimit;
Expand All @@ -49,6 +50,8 @@
import static com.facebook.presto.sql.planner.plan.JoinNode.DistributionType.PARTITIONED;
import static com.facebook.presto.sql.planner.plan.JoinNode.DistributionType.REPLICATED;
import static com.facebook.presto.sql.planner.plan.Patterns.join;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static java.lang.Double.NaN;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -102,6 +105,10 @@ private PlanNode getCostBasedJoin(JoinNode joinNode, Context context)
addJoinsWithDifferentDistributions(joinNode.flipChildren(), possibleJoinNodes, context);

if (possibleJoinNodes.stream().anyMatch(result -> result.getCost().hasUnknownComponents()) || possibleJoinNodes.isEmpty()) {
// TODO: currently this session parameter is added so as to roll out the plan change gradually, after proved to be a better choice, make it default and get rid of the session parameter here.
if (isUseBroadcastJoinWhenBuildSizeSmallProbeSizeUnknownEnabled(context.getSession()) && possibleJoinNodes.stream().anyMatch(result -> ((JoinNode) result.getPlanNode()).getDistributionType().get().equals(REPLICATED))) {
return getOnlyElement(possibleJoinNodes.stream().filter(result -> ((JoinNode) result.getPlanNode()).getDistributionType().get().equals(REPLICATED)).map(x -> x.getPlanNode()).collect(toImmutableList()));
}
if (isSizeBasedJoinDistributionTypeEnabled(context.getSession())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this different from sizeBasedJoin here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The size based join only considers the size of input table, even when the estimation of immediate probe/build input is available.

For example, if I have a query like
with B as (select * from t1 join t2 using (key)) select * from A Join B using (key),
if we have estimation of A unknown, but estimation of B is small, we will not have broadcast with size based join. As in size based join, the size of t1 and t2 considered not representative of size of B after join operation. This is one of the case this PR is trying to solve.

I also thought about patching to the size based join, however, the size based join produces query plans for cases which I do not need here, and I will still need to have a separate session parameter inside the size based join implementation to control it. And it also makes the logic of size based join more complex. Hence I chose to have it as a separate part here.

return getSizeBasedJoin(joinNode, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ public void testDefaults()
.setDefaultJoinSelectivityCoefficient(0)
.setRewriteCrossJoinWithOrFilterToInnerJoin(true)
.setRewriteCrossJoinWithArrayContainsFilterToInnerJoin(true)
.setLeftJoinNullFilterToSemiJoin(true));
.setLeftJoinNullFilterToSemiJoin(true)
.setBroadcastJoinWithSmallBuildUnknownProbe(false));
}

@Test
Expand Down Expand Up @@ -419,6 +420,7 @@ public void testExplicitPropertyMappings()
.put("optimizer.rewrite-cross-join-with-array-contains-filter-to-inner-join", "false")
.put("optimizer.default-join-selectivity-coefficient", "0.5")
.put("optimizer.rewrite-left-join-with-null-filter-to-semi-join", "false")
.put("experimental.optimizer.broadcast-join-with-small-build-unknown-probe", "true")
.build();

FeaturesConfig expected = new FeaturesConfig()
Expand Down Expand Up @@ -599,7 +601,8 @@ public void testExplicitPropertyMappings()
.setPushDownFilterExpressionEvaluationThroughCrossJoin(PushDownFilterThroughCrossJoinStrategy.DISABLED)
.setRewriteCrossJoinWithOrFilterToInnerJoin(false)
.setRewriteCrossJoinWithArrayContainsFilterToInnerJoin(false)
.setLeftJoinNullFilterToSemiJoin(false);
.setLeftJoinNullFilterToSemiJoin(false)
.setBroadcastJoinWithSmallBuildUnknownProbe(true);
assertFullMapping(properties, expected);
}

Expand Down
Loading