diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index 058a14dd2fdca..6ffbe43bcdf37 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -147,6 +147,7 @@ public final class SystemSessionProperties public static final String FAST_INEQUALITY_JOINS = "fast_inequality_joins"; public static final String QUERY_PRIORITY = "query_priority"; public static final String CONFIDENCE_BASED_BROADCAST_ENABLED = "confidence_based_broadcast_enabled"; + public static final String TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED = "treat_low_confidence_zero_estimation_unknown_enabled"; public static final String SPILL_ENABLED = "spill_enabled"; public static final String JOIN_SPILL_ENABLED = "join_spill_enabled"; public static final String AGGREGATION_SPILL_ENABLED = "aggregation_spill_enabled"; @@ -429,6 +430,11 @@ public SystemSessionProperties( "Enable confidence based broadcasting when enabled", false, false), + booleanProperty( + TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, + "Treat low confidence zero estimations as unknowns during joins when enabled", + false, + false), booleanProperty( DISTRIBUTED_INDEX_JOIN, "Distribute index joins on join keys instead of executing inline", @@ -2030,6 +2036,11 @@ public static boolean confidenceBasedBroadcastEnabled(Session session) return session.getSystemProperty(CONFIDENCE_BASED_BROADCAST_ENABLED, Boolean.class); } + public static boolean treatLowConfidenceZeroEstimationAsUnknownEnabled(Session session) + { + return session.getSystemProperty(TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, Boolean.class); + } + public static int getHashPartitionCount(Session session) { return session.getSystemProperty(HASH_PARTITION_COUNT, Integer.class); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/ConfidenceBasedBroadcastUtil.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/ConfidenceBasedBroadcastUtil.java index aa425ee9a1654..4169b193c20ad 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/ConfidenceBasedBroadcastUtil.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/ConfidenceBasedBroadcastUtil.java @@ -18,7 +18,10 @@ import java.util.Optional; +import static com.facebook.presto.spi.plan.JoinDistributionType.PARTITIONED; import static com.facebook.presto.spi.plan.JoinDistributionType.REPLICATED; +import static com.facebook.presto.sql.planner.iterative.rule.DetermineJoinDistributionType.isBelowMaxBroadcastSize; +import static com.facebook.presto.sql.planner.iterative.rule.DetermineJoinDistributionType.mustPartition; public class ConfidenceBasedBroadcastUtil { @@ -38,4 +41,30 @@ else if (leftConfidence.getConfidenceOrdinal() > rightConfidence.getConfidenceOr return Optional.empty(); } + + public static Optional treatLowConfidenceZeroEstimationsAsUnknown(boolean probeSideLowConfidenceZero, boolean buildSideLowConfidenceZero, JoinNode joinNode, Rule.Context context) + { + if (buildSideLowConfidenceZero && probeSideLowConfidenceZero) { + return Optional.of(joinNode.withDistributionType(PARTITIONED)); + } + else if (buildSideLowConfidenceZero) { + if (isBelowMaxBroadcastSize(joinNode.flipChildren(), context) && !mustPartition(joinNode)) { + return Optional.of(joinNode.flipChildren().withDistributionType(REPLICATED)); + } + else { + return Optional.of(joinNode.withDistributionType(PARTITIONED)); + } + } + else if (probeSideLowConfidenceZero) { + if (isBelowMaxBroadcastSize(joinNode, context) && !mustPartition(joinNode)) { + return Optional.of(joinNode.withDistributionType(REPLICATED)); + } + else { + return Optional.of(joinNode.withDistributionType(PARTITIONED)); + } + } + else { + return Optional.empty(); + } + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/DetermineJoinDistributionType.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/DetermineJoinDistributionType.java index 59a05db76900c..e852558a7a672 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/DetermineJoinDistributionType.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/DetermineJoinDistributionType.java @@ -47,11 +47,14 @@ import static com.facebook.presto.SystemSessionProperties.getJoinMaxBroadcastTableSize; import static com.facebook.presto.SystemSessionProperties.isSizeBasedJoinDistributionTypeEnabled; import static com.facebook.presto.SystemSessionProperties.isUseBroadcastJoinWhenBuildSizeSmallProbeSizeUnknownEnabled; +import static com.facebook.presto.SystemSessionProperties.treatLowConfidenceZeroEstimationAsUnknownEnabled; import static com.facebook.presto.cost.CostCalculatorWithEstimatedExchanges.calculateJoinCostWithoutOutput; import static com.facebook.presto.spi.plan.JoinDistributionType.PARTITIONED; import static com.facebook.presto.spi.plan.JoinDistributionType.REPLICATED; +import static com.facebook.presto.spi.statistics.SourceInfo.ConfidenceLevel.LOW; import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.AUTOMATIC; import static com.facebook.presto.sql.planner.iterative.ConfidenceBasedBroadcastUtil.confidenceBasedBroadcast; +import static com.facebook.presto.sql.planner.iterative.ConfidenceBasedBroadcastUtil.treatLowConfidenceZeroEstimationsAsUnknown; import static com.facebook.presto.sql.planner.iterative.rule.JoinSwappingUtils.isBelowBroadcastLimit; import static com.facebook.presto.sql.planner.iterative.rule.JoinSwappingUtils.isSmallerThanThreshold; import static com.facebook.presto.sql.planner.optimizations.QueryCardinalityUtil.isAtMostScalar; @@ -114,6 +117,11 @@ public static boolean isBelowMaxBroadcastSize(JoinNode joinNode, Context context PlanNode buildSide = joinNode.getRight(); PlanNodeStatsEstimate buildSideStatsEstimate = context.getStatsProvider().getStats(buildSide); + + if (treatLowConfidenceZeroEstimationAsUnknownEnabled(context.getSession()) && isLowConfidenceZero(buildSide, context)) { + return false; + } + double buildSideSizeInBytes = buildSideStatsEstimate.getOutputSizeInBytes(buildSide); return buildSideSizeInBytes <= joinMaxBroadcastTableSize.toBytes() || (isSizeBasedJoinDistributionTypeEnabled(context.getSession()) @@ -134,6 +142,15 @@ private PlanNode getCostBasedJoin(JoinNode joinNode, Context context) } } + boolean buildSideLowConfidenceZero = isLowConfidenceZero(joinNode.getRight(), context); + boolean probeSideLowConfidenceZero = isLowConfidenceZero(joinNode.getLeft(), context); + if ((buildSideLowConfidenceZero || probeSideLowConfidenceZero) && treatLowConfidenceZeroEstimationAsUnknownEnabled(context.getSession())) { + Optional result = treatLowConfidenceZeroEstimationsAsUnknown(probeSideLowConfidenceZero, buildSideLowConfidenceZero, joinNode, context); + if (result.isPresent()) { + return result.get(); + } + } + if (possibleJoinNodes.stream().anyMatch(result -> result.getCost().hasUnknownComponents()) || possibleJoinNodes.isEmpty()) { // TODO: currently this session parameter is added so as to roll out the plan change gradually, after proved to be a better choice, make it default and get rid of the session parameter here. if (isUseBroadcastJoinWhenBuildSizeSmallProbeSizeUnknownEnabled(context.getSession()) && possibleJoinNodes.stream().anyMatch(result -> ((JoinNode) result.getPlanNode()).getDistributionType().get().equals(REPLICATED))) { @@ -295,4 +312,10 @@ private PlanNodeWithCost getJoinNodeWithCost(Context context, JoinNode possibleJ estimatedSourceDistributedTaskCount); return new PlanNodeWithCost(cost.toPlanCost(), possibleJoinNode); } + + private static boolean isLowConfidenceZero(PlanNode planNode, Context context) + { + PlanNodeStatsEstimate statsEstimate = context.getStatsProvider().getStats(planNode); + return statsEstimate.confidenceLevel() == LOW && statsEstimate.getOutputRowCount() == 0; + } } diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineJoinDistributionType.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineJoinDistributionType.java index 56242a55b919a..47ec9dbc5030c 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineJoinDistributionType.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineJoinDistributionType.java @@ -45,6 +45,8 @@ import static com.facebook.presto.SystemSessionProperties.CONFIDENCE_BASED_BROADCAST_ENABLED; import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static com.facebook.presto.SystemSessionProperties.JOIN_MAX_BROADCAST_TABLE_SIZE; +import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_BROADCAST_MEMORY; +import static com.facebook.presto.SystemSessionProperties.TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED; import static com.facebook.presto.SystemSessionProperties.USE_BROADCAST_WHEN_BUILDSIZE_SMALL_PROBESIDE_UNKNOWN; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType; @@ -460,6 +462,259 @@ public void testLeftAndRightHighConfidenceLeftSmaller() values(ImmutableMap.of("A1", 0, "A2", 1)))); } + @Test + public void testJoinWithRightSideLowConfidenceZeroStatisticsLeftSideHighBroadcast() + { + int aRows = 50; + int bRows = 0; + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, "true") + .overrideStats("valuesA", PlanNodeStatsEstimate.builder() + .setConfidence(HIGH) + .setOutputRowCount(aRows) + .addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "A1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100))) + .build()) + .overrideStats("valuesB", PlanNodeStatsEstimate.builder() + .setConfidence(LOW) + .setOutputRowCount(bRows) + .addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "B1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100))) + .build()) + .on(p -> + p.join( + INNER, + p.values(new PlanNodeId("valuesA"), aRows, p.variable("A1", BIGINT)), + p.values(new PlanNodeId("valuesB"), bRows, p.variable("B1", BIGINT)), + ImmutableList.of(new EquiJoinClause(p.variable("A1", BIGINT), p.variable("B1", BIGINT))), + ImmutableList.of(p.variable("A1", BIGINT), p.variable("B1", BIGINT)), + Optional.empty())) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("B1", "A1")), + Optional.empty(), + Optional.of(REPLICATED), + values(ImmutableMap.of("B1", 0)), + values(ImmutableMap.of("A1", 0)))); + } + + @Test + public void testJoinWithLeftSideLowConfidenceZeroStatisticsRightSideHighBroadcast() + { + int aRows = 0; + int bRows = 50; + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, "true") + .overrideStats("valuesA", PlanNodeStatsEstimate.builder() + .setConfidence(LOW) + .setOutputRowCount(aRows) + .addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "A1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100))) + .build()) + .overrideStats("valuesB", PlanNodeStatsEstimate.builder() + .setConfidence(HIGH) + .setOutputRowCount(bRows) + .addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "B1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100))) + .build()) + .on(p -> + p.join( + INNER, + p.values(new PlanNodeId("valuesA"), aRows, p.variable("A1", BIGINT)), + p.values(new PlanNodeId("valuesB"), bRows, p.variable("B1", BIGINT)), + ImmutableList.of(new EquiJoinClause(p.variable("A1", BIGINT), p.variable("B1", BIGINT))), + ImmutableList.of(p.variable("A1", BIGINT), p.variable("B1", BIGINT)), + Optional.empty())) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(REPLICATED), + values(ImmutableMap.of("A1", 0)), + values(ImmutableMap.of("B1", 0)))); + } + + @Test + public void testJoinWithLeftSideLowConfidenceZeroStatisticsRightSidePartition() + { + int aRows = 0; + int bRows = 50_000; + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, "true") + .setSystemProperty(QUERY_MAX_BROADCAST_MEMORY, "5kB") + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "5kB") + .overrideStats("valuesA", PlanNodeStatsEstimate.builder() + .setConfidence(LOW) + .setOutputRowCount(aRows) + .addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "A1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100))) + .build()) + .overrideStats("valuesB", PlanNodeStatsEstimate.builder() + .setConfidence(HIGH) + .setOutputRowCount(bRows) + .addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "B1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100))) + .build()) + .on(p -> + p.join( + INNER, + p.values(new PlanNodeId("valuesA"), aRows, p.variable("A1", BIGINT)), + p.values(new PlanNodeId("valuesB"), bRows, p.variable("B1", BIGINT)), + ImmutableList.of(new EquiJoinClause(p.variable("A1", BIGINT), p.variable("B1", BIGINT))), + ImmutableList.of(p.variable("A1", BIGINT), p.variable("B1", BIGINT)), + Optional.empty())) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(PARTITIONED), + values(ImmutableMap.of("A1", 0)), + values(ImmutableMap.of("B1", 0)))); + } + + @Test + public void testJoinWithRightSideLowConfidenceZeroStatisticsLeftSideBigPartition() + { + int aRows = 50_000; + int bRows = 0; + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, "true") + .setSystemProperty(QUERY_MAX_BROADCAST_MEMORY, "5kB") + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "5kB") + .overrideStats("valuesA", PlanNodeStatsEstimate.builder() + .setConfidence(HIGH) + .setOutputRowCount(aRows) + .addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "A1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100))) + .build()) + .overrideStats("valuesB", PlanNodeStatsEstimate.builder() + .setConfidence(LOW) + .setOutputRowCount(bRows) + .addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "B1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100))) + .build()) + .on(p -> + p.join( + INNER, + p.values(new PlanNodeId("valuesA"), aRows, p.variable("A1", BIGINT)), + p.values(new PlanNodeId("valuesB"), bRows, p.variable("B1", BIGINT)), + ImmutableList.of(new EquiJoinClause(p.variable("A1", BIGINT), p.variable("B1", BIGINT))), + ImmutableList.of(p.variable("A1", BIGINT), p.variable("B1", BIGINT)), + Optional.empty())) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(PARTITIONED), + values(ImmutableMap.of("A1", 0)), + values(ImmutableMap.of("B1", 0)))); + } + + @Test + public void testJoinWithBothSideLowConfidenceZeroStatisticsPartition() + { + int aRows = 0; + int bRows = 0; + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, "true") + .overrideStats("valuesA", PlanNodeStatsEstimate.builder() + .setConfidence(LOW) + .setOutputRowCount(aRows) + .addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "A1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100))) + .build()) + .overrideStats("valuesB", PlanNodeStatsEstimate.builder() + .setConfidence(LOW) + .setOutputRowCount(bRows) + .addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "B1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100))) + .build()) + .on(p -> + p.join( + INNER, + p.values(new PlanNodeId("valuesA"), aRows, p.variable("A1", BIGINT)), + p.values(new PlanNodeId("valuesB"), bRows, p.variable("B1", BIGINT)), + ImmutableList.of(new EquiJoinClause(p.variable("A1", BIGINT), p.variable("B1", BIGINT))), + ImmutableList.of(p.variable("A1", BIGINT), p.variable("B1", BIGINT)), + Optional.empty())) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(PARTITIONED), + values(ImmutableMap.of("A1", 0)), + values(ImmutableMap.of("B1", 0)))); + } + + @Test + public void testJoinWithRightSideLowConfidenceZeroStatisticsLeftSideLowConfidencePartition() + { + int aRows = 50_000; + int bRows = 0; + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, "true") + .setSystemProperty(QUERY_MAX_BROADCAST_MEMORY, "5kB") + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "5kB") + .overrideStats("valuesA", PlanNodeStatsEstimate.builder() + .setConfidence(LOW) + .setOutputRowCount(aRows) + .addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "A1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100))) + .build()) + .overrideStats("valuesB", PlanNodeStatsEstimate.builder() + .setConfidence(LOW) + .setOutputRowCount(bRows) + .addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "B1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100))) + .build()) + .on(p -> + p.join( + INNER, + p.values(new PlanNodeId("valuesA"), aRows, p.variable("A1", BIGINT)), + p.values(new PlanNodeId("valuesB"), bRows, p.variable("B1", BIGINT)), + ImmutableList.of(new EquiJoinClause(p.variable("A1", BIGINT), p.variable("B1", BIGINT))), + ImmutableList.of(p.variable("A1", BIGINT), p.variable("B1", BIGINT)), + Optional.empty())) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(PARTITIONED), + values(ImmutableMap.of("A1", 0)), + values(ImmutableMap.of("B1", 0)))); + } + + @Test + public void testJoinWithLeftSideLowConfidenceZeroStatisticsRightSideLowConfidencePartition() + { + int aRows = 50_000; + int bRows = 0; + assertDetermineJoinDistributionType() + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.AUTOMATIC.name()) + .setSystemProperty(TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, "true") + .setSystemProperty(QUERY_MAX_BROADCAST_MEMORY, "5kB") + .setSystemProperty(JOIN_MAX_BROADCAST_TABLE_SIZE, "5kB") + .overrideStats("valuesA", PlanNodeStatsEstimate.builder() + .setConfidence(LOW) + .setOutputRowCount(aRows) + .addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "A1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100))) + .build()) + .overrideStats("valuesB", PlanNodeStatsEstimate.builder() + .setConfidence(LOW) + .setOutputRowCount(bRows) + .addVariableStatistics(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), "B1", BIGINT), new VariableStatsEstimate(0, 100, 0, 60, 100))) + .build()) + .on(p -> + p.join( + INNER, + p.values(new PlanNodeId("valuesA"), aRows, p.variable("A1", BIGINT)), + p.values(new PlanNodeId("valuesB"), bRows, p.variable("B1", BIGINT)), + ImmutableList.of(new EquiJoinClause(p.variable("A1", BIGINT), p.variable("B1", BIGINT))), + ImmutableList.of(p.variable("A1", BIGINT), p.variable("B1", BIGINT)), + Optional.empty())) + .matches(join( + INNER, + ImmutableList.of(equiJoinClause("A1", "B1")), + Optional.empty(), + Optional.of(PARTITIONED), + values(ImmutableMap.of("A1", 0)), + values(ImmutableMap.of("B1", 0)))); + } + @Test public void testFlipAndReplicateWhenOneTableMuchSmaller() {