diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index ab28961632fee..47562f204f637 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -274,6 +274,7 @@ public final class SystemSessionProperties public static final String REWRITE_CROSS_JOIN_OR_TO_INNER_JOIN = "rewrite_cross_join_or_to_inner_join"; public static final String REWRITE_CROSS_JOIN_ARRAY_CONTAINS_TO_INNER_JOIN = "rewrite_cross_join_array_contains_to_inner_join"; public static final String REWRITE_LEFT_JOIN_NULL_FILTER_TO_SEMI_JOIN = "rewrite_left_join_null_filter_to_semi_join"; + public static final String USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN = "use_broadcast_when_buildsize_small_probeside_unknown"; // TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future. public static final String NATIVE_SIMPLIFIED_EXPRESSION_EVALUATION_ENABLED = "simplified_expression_evaluation_enabled"; @@ -1591,6 +1592,11 @@ public SystemSessionProperties( REWRITE_LEFT_JOIN_NULL_FILTER_TO_SEMI_JOIN, "Rewrite left join with is null check to semi join", featuresConfig.isLeftJoinNullFilterToSemiJoin(), + false), + booleanProperty( + USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN, + "Experimental: When probe side size is unknown but build size is within broadcast limit, choose broadcast join", + featuresConfig.isBroadcastJoinWithSmallBuildUnknownProbe(), false)); } @@ -2674,4 +2680,9 @@ public static boolean isRewriteLeftJoinNullFilterToSemiJoinEnabled(Session sessi { return session.getSystemProperty(REWRITE_LEFT_JOIN_NULL_FILTER_TO_SEMI_JOIN, Boolean.class); } + + public static boolean isUseBroadcastJoinWhenBuildSizeSmallProbeSizeUnknownEnabled(Session session) + { + return session.getSystemProperty(USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN, Boolean.class); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 70f39e9fe1d9d..402844e973953 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -268,6 +268,7 @@ public class FeaturesConfig private boolean rewriteCrossJoinWithArrayContainsFilterToInnerJoin = true; private JoinNotNullInferenceStrategy joinNotNullInferenceStrategy = NONE; private boolean leftJoinNullFilterToSemiJoin = true; + private boolean broadcastJoinWithSmallBuildUnknownProbe; private boolean preProcessMetadataCalls; @@ -2640,4 +2641,17 @@ public FeaturesConfig setLeftJoinNullFilterToSemiJoin(boolean leftJoinNullFilter this.leftJoinNullFilterToSemiJoin = leftJoinNullFilterToSemiJoin; return this; } + + public boolean isBroadcastJoinWithSmallBuildUnknownProbe() + { + return this.broadcastJoinWithSmallBuildUnknownProbe; + } + + @Config("experimental.optimizer.broadcast-join-with-small-build-unknown-probe") + @ConfigDescription("Experimental: When probe side size is unknown but build size is within broadcast limit, choose broadcast join") + public FeaturesConfig setBroadcastJoinWithSmallBuildUnknownProbe(boolean broadcastJoinWithSmallBuildUnknownProbe) + { + this.broadcastJoinWithSmallBuildUnknownProbe = broadcastJoinWithSmallBuildUnknownProbe; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/DetermineJoinDistributionType.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/DetermineJoinDistributionType.java index 25fe341dda7f5..2d62d1bfbe15e 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/DetermineJoinDistributionType.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/DetermineJoinDistributionType.java @@ -41,6 +41,7 @@ import static com.facebook.presto.SystemSessionProperties.getJoinDistributionType; import static com.facebook.presto.SystemSessionProperties.getJoinMaxBroadcastTableSize; import static com.facebook.presto.SystemSessionProperties.isSizeBasedJoinDistributionTypeEnabled; +import static com.facebook.presto.SystemSessionProperties.isUseBroadcastJoinWhenBuildSizeSmallProbeSizeUnknownEnabled; import static com.facebook.presto.cost.CostCalculatorWithEstimatedExchanges.calculateJoinCostWithoutOutput; import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.AUTOMATIC; import static com.facebook.presto.sql.planner.iterative.rule.JoinSwappingUtils.isBelowBroadcastLimit; @@ -49,6 +50,8 @@ import static com.facebook.presto.sql.planner.plan.JoinNode.DistributionType.PARTITIONED; import static com.facebook.presto.sql.planner.plan.JoinNode.DistributionType.REPLICATED; import static com.facebook.presto.sql.planner.plan.Patterns.join; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.getOnlyElement; import static java.lang.Double.NaN; import static java.util.Objects.requireNonNull; @@ -102,6 +105,10 @@ private PlanNode getCostBasedJoin(JoinNode joinNode, Context context) addJoinsWithDifferentDistributions(joinNode.flipChildren(), possibleJoinNodes, context); if (possibleJoinNodes.stream().anyMatch(result -> result.getCost().hasUnknownComponents()) || possibleJoinNodes.isEmpty()) { + // TODO: currently this session parameter is added so as to roll out the plan change gradually, after proved to be a better choice, make it default and get rid of the session parameter here. + if (isUseBroadcastJoinWhenBuildSizeSmallProbeSizeUnknownEnabled(context.getSession()) && possibleJoinNodes.stream().anyMatch(result -> ((JoinNode) result.getPlanNode()).getDistributionType().get().equals(REPLICATED))) { + return getOnlyElement(possibleJoinNodes.stream().filter(result -> ((JoinNode) result.getPlanNode()).getDistributionType().get().equals(REPLICATED)).map(x -> x.getPlanNode()).collect(toImmutableList())); + } if (isSizeBasedJoinDistributionTypeEnabled(context.getSession())) { return getSizeBasedJoin(joinNode, context); } diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index 001c7e4f1be6f..fe06e308c623d 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -235,7 +235,8 @@ public void testDefaults() .setDefaultJoinSelectivityCoefficient(0) .setRewriteCrossJoinWithOrFilterToInnerJoin(true) .setRewriteCrossJoinWithArrayContainsFilterToInnerJoin(true) - .setLeftJoinNullFilterToSemiJoin(true)); + .setLeftJoinNullFilterToSemiJoin(true) + .setBroadcastJoinWithSmallBuildUnknownProbe(false)); } @Test @@ -419,6 +420,7 @@ public void testExplicitPropertyMappings() .put("optimizer.rewrite-cross-join-with-array-contains-filter-to-inner-join", "false") .put("optimizer.default-join-selectivity-coefficient", "0.5") .put("optimizer.rewrite-left-join-with-null-filter-to-semi-join", "false") + .put("experimental.optimizer.broadcast-join-with-small-build-unknown-probe", "true") .build(); FeaturesConfig expected = new FeaturesConfig() @@ -599,7 +601,8 @@ public void testExplicitPropertyMappings() .setPushDownFilterExpressionEvaluationThroughCrossJoin(PushDownFilterThroughCrossJoinStrategy.DISABLED) .setRewriteCrossJoinWithOrFilterToInnerJoin(false) .setRewriteCrossJoinWithArrayContainsFilterToInnerJoin(false) - .setLeftJoinNullFilterToSemiJoin(false); + .setLeftJoinNullFilterToSemiJoin(false) + .setBroadcastJoinWithSmallBuildUnknownProbe(true); assertFullMapping(properties, expected); } diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineJoinDistributionType.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineJoinDistributionType.java index 2929d7eff8745..5bb856cadcc1e 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineJoinDistributionType.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineJoinDistributionType.java @@ -45,6 +45,7 @@ import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static com.facebook.presto.SystemSessionProperties.JOIN_MAX_BROADCAST_TABLE_SIZE; +import static com.facebook.presto.SystemSessionProperties.USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType; import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT; @@ -816,6 +817,357 @@ public void testReplicatesWhenSourceIsSmall() filter("true", values(ImmutableMap.of("B1", 0))))); } + @Test + public void testReplicatesWhenOneSourceIsSmallAndTheOtherUnknown() + { + VarcharType variableType = createUnboundedVarcharType(); // variable width so that average row size is respected + int aRows = 10_000; + int bRows = 10; + + // output size does not exceed JOIN_MAX_BROADCAST_TABLE_SIZE limit + PlanNodeStatsEstimate bStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(bRows) + .addVariableStatistics(ImmutableMap.of( + new VariableReferenceExpression(Optional.empty(), "B1", variableType), + new VariableStatsEstimate(0, 100, 0, 64, 10))) + .build(); + + PlanNodeStatsEstimate bLargeStatsEstimate = PlanNodeStatsEstimate.builder() + .setOutputRowCount(bRows) + .addVariableStatistics(ImmutableMap.of( + new VariableReferenceExpression(Optional.empty(), "B1", variableType), + new VariableStatsEstimate(0, 100, 0, 640000d * 10000, 10))) + .build(); + + // flip and broadcast + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .setSystemProperty(USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN, "true") + .overrideStats("valuesA", PlanNodeStatsEstimate.unknown()) + .overrideStats("valuesB", bStatsEstimate) + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + INNER, + p.values(new PlanNodeId("valuesB"), bRows, b1), + p.values(new PlanNodeId("valuesA"), aRows, a1), + ImmutableList.of(new JoinNode.EquiJoinClause(b1, a1)), + ImmutableList.of(b1, a1), + Optional.empty()); + }) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(REPLICATED), + values(ImmutableMap.of("A1", 0)), + values(ImmutableMap.of("B1", 0)))); + + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .setSystemProperty(USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN, "true") + .overrideStats("valuesA", PlanNodeStatsEstimate.unknown()) + .overrideStats("valuesB", bLargeStatsEstimate) + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + INNER, + p.values(new PlanNodeId("valuesB"), bRows, b1), + p.values(new PlanNodeId("valuesA"), aRows, a1), + ImmutableList.of(new JoinNode.EquiJoinClause(b1, a1)), + ImmutableList.of(b1, a1), + Optional.empty()); + }) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("B1", "A1")), + Optional.empty(), + Optional.of(PARTITIONED), + values(ImmutableMap.of("B1", 0)), + values(ImmutableMap.of("A1", 0)))); + + // broadcast + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .setSystemProperty(USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN, "true") + .overrideStats("valuesA", PlanNodeStatsEstimate.unknown()) + .overrideStats("valuesB", bStatsEstimate) + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + INNER, + p.values(new PlanNodeId("valuesA"), aRows, a1), + p.values(new PlanNodeId("valuesB"), bRows, b1), + ImmutableList.of(new JoinNode.EquiJoinClause(a1, b1)), + ImmutableList.of(a1, b1), + Optional.empty()); + }) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(REPLICATED), + values(ImmutableMap.of("A1", 0)), + values(ImmutableMap.of("B1", 0)))); + + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .setSystemProperty(USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN, "true") + .overrideStats("valuesA", PlanNodeStatsEstimate.unknown()) + .overrideStats("valuesB", bStatsEstimate) + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + INNER, + p.values(new PlanNodeId("valuesA"), aRows, a1), + p.values(new PlanNodeId("valuesB"), bRows, b1), + ImmutableList.of(new JoinNode.EquiJoinClause(a1, b1)), + ImmutableList.of(a1, b1), + Optional.empty()); + }) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(REPLICATED), + values(ImmutableMap.of("A1", 0)), + values(ImmutableMap.of("B1", 0)))); + + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .setSystemProperty(USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN, "true") + .overrideStats("valuesA", PlanNodeStatsEstimate.unknown()) + .overrideStats("valuesB", bLargeStatsEstimate) + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + INNER, + p.values(new PlanNodeId("valuesA"), aRows, a1), + p.values(new PlanNodeId("valuesB"), bRows, b1), + ImmutableList.of(new JoinNode.EquiJoinClause(a1, b1)), + ImmutableList.of(a1, b1), + Optional.empty()); + }) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(PARTITIONED), + values(ImmutableMap.of("A1", 0)), + values(ImmutableMap.of("B1", 0)))); + + // Right join cannot be broadcast + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .setSystemProperty(USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN, "true") + .overrideStats("valuesA", PlanNodeStatsEstimate.unknown()) + .overrideStats("valuesB", bStatsEstimate) + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + LEFT, + p.values(new PlanNodeId("valuesB"), bRows, b1), + p.values(new PlanNodeId("valuesA"), aRows, a1), + ImmutableList.of(new JoinNode.EquiJoinClause(b1, a1)), + ImmutableList.of(b1, a1), + Optional.empty()); + }) + .matches(join( + RIGHT, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(PARTITIONED), + values(ImmutableMap.of("A1", 0)), + values(ImmutableMap.of("B1", 0)))); + + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .setSystemProperty(USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN, "true") + .overrideStats("valuesA", PlanNodeStatsEstimate.unknown()) + .overrideStats("valuesB", bStatsEstimate) + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + LEFT, + p.values(new PlanNodeId("valuesA"), aRows, a1), + p.values(new PlanNodeId("valuesB"), bRows, b1), + ImmutableList.of(new JoinNode.EquiJoinClause(a1, b1)), + ImmutableList.of(a1, b1), + Optional.empty()); + }) + .matches(join( + LEFT, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(REPLICATED), + values(ImmutableMap.of("A1", 0)), + values(ImmutableMap.of("B1", 0)))); + + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .setSystemProperty(USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN, "true") + .overrideStats("valuesA", PlanNodeStatsEstimate.unknown()) + .overrideStats("valuesB", bLargeStatsEstimate) + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + LEFT, + p.values(new PlanNodeId("valuesA"), aRows, a1), + p.values(new PlanNodeId("valuesB"), bRows, b1), + ImmutableList.of(new JoinNode.EquiJoinClause(a1, b1)), + ImmutableList.of(a1, b1), + Optional.empty()); + }) + .matches(join( + LEFT, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(PARTITIONED), + values(ImmutableMap.of("A1", 0)), + values(ImmutableMap.of("B1", 0)))); + + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .setSystemProperty(USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN, "true") + .overrideStats("valuesA", PlanNodeStatsEstimate.unknown()) + .overrideStats("valuesB", bStatsEstimate) + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + RIGHT, + p.values(new PlanNodeId("valuesB"), bRows, b1), + p.values(new PlanNodeId("valuesA"), aRows, a1), + ImmutableList.of(new JoinNode.EquiJoinClause(b1, a1)), + ImmutableList.of(b1, a1), + Optional.empty()); + }) + .matches(join( + LEFT, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(REPLICATED), + values(ImmutableMap.of("A1", 0)), + values(ImmutableMap.of("B1", 0)))); + + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .setSystemProperty(USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN, "true") + .overrideStats("valuesA", PlanNodeStatsEstimate.unknown()) + .overrideStats("valuesB", bLargeStatsEstimate) + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + RIGHT, + p.values(new PlanNodeId("valuesB"), bRows, b1), + p.values(new PlanNodeId("valuesA"), aRows, a1), + ImmutableList.of(new JoinNode.EquiJoinClause(b1, a1)), + ImmutableList.of(b1, a1), + Optional.empty()); + }) + .matches(join( + RIGHT, + ImmutableList.of(equiJoinClause("B1", "A1")), + Optional.empty(), + Optional.of(PARTITIONED), + values(ImmutableMap.of("B1", 0)), + values(ImmutableMap.of("A1", 0)))); + + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .setSystemProperty(USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN, "true") + .overrideStats("valuesA", PlanNodeStatsEstimate.unknown()) + .overrideStats("valuesB", bStatsEstimate) + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + RIGHT, + p.values(new PlanNodeId("valuesA"), aRows, a1), + p.values(new PlanNodeId("valuesB"), bRows, b1), + ImmutableList.of(new JoinNode.EquiJoinClause(a1, b1)), + ImmutableList.of(a1, b1), + Optional.empty()); + }) + .matches(join( + RIGHT, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(PARTITIONED), + values(ImmutableMap.of("A1", 0)), + values(ImmutableMap.of("B1", 0)))); + + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .setSystemProperty(USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN, "true") + .overrideStats("valuesA", PlanNodeStatsEstimate.unknown()) + .overrideStats("valuesB", bStatsEstimate) + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + FULL, + p.values(new PlanNodeId("valuesB"), bRows, b1), + p.values(new PlanNodeId("valuesA"), aRows, a1), + ImmutableList.of(new JoinNode.EquiJoinClause(b1, a1)), + ImmutableList.of(b1, a1), + Optional.empty()); + }) + .matches(join( + FULL, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(PARTITIONED), + values(ImmutableMap.of("A1", 0)), + values(ImmutableMap.of("B1", 0)))); + + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "100MB") + .setSystemProperty(USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN, "true") + .overrideStats("valuesA", PlanNodeStatsEstimate.unknown()) + .overrideStats("valuesB", bStatsEstimate) + .on(p -> { + VariableReferenceExpression a1 = p.variable("A1", variableType); + VariableReferenceExpression b1 = p.variable("B1", variableType); + return p.join( + FULL, + p.values(new PlanNodeId("valuesA"), aRows, a1), + p.values(new PlanNodeId("valuesB"), bRows, b1), + ImmutableList.of(new JoinNode.EquiJoinClause(a1, b1)), + ImmutableList.of(a1, b1), + Optional.empty()); + }) + .matches(join( + FULL, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(PARTITIONED), + values(ImmutableMap.of("A1", 0)), + values(ImmutableMap.of("B1", 0)))); + } + @Test public void testFlipWhenSizeDifferenceLarge() {