diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlan.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlan.java index 0877f68a4cdde..7e65dd0bf5f14 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlan.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlan.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Optional; +import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION; import static com.facebook.presto.SystemSessionProperties.PREFER_MERGE_JOIN; import static com.facebook.presto.hive.HiveQueryRunner.HIVE_CATALOG; import static com.facebook.presto.hive.HiveSessionProperties.ORDER_BASED_EXECUTION_ENABLED; @@ -51,6 +52,8 @@ protected QueryRunner createQueryRunner() Optional.empty()); } + // todo add sorting properties for verification + @Test public void testSessionProperty() { @@ -67,8 +70,19 @@ public void testSessionProperty() " sorted_by = ARRAY['custkey'], partitioned_by=array['ds']) AS \n" + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.\"orders\" LIMIT 1000"); - // By default, we don't enable merge join + // By default, we can't enable merge join + assertPlan( + "select * from test_join_customer join test_join_order on test_join_customer.custkey = test_join_order.custkey", + joinPlan("test_join_customer", "test_join_order", ImmutableList.of("custkey"), ImmutableList.of("custkey"), false)); + + // when we miss session property, we can't enable merge join assertPlan( + missGroupedExecution(), + "select * from test_join_customer join test_join_order on test_join_customer.custkey = test_join_order.custkey", + joinPlan("test_join_customer", "test_join_order", ImmutableList.of("custkey"), ImmutableList.of("custkey"), false)); + + assertPlan( + missOrderBasedExecution(), "select * from test_join_customer join test_join_order on test_join_customer.custkey = test_join_order.custkey", joinPlan("test_join_customer", "test_join_order", ImmutableList.of("custkey"), ImmutableList.of("custkey"), false)); @@ -175,36 +189,129 @@ public void testMultipleJoinKeys() try { queryRunner.execute("CREATE TABLE test_join_customer5 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey', 'nationkey'], \n" + + " sorted_by = ARRAY['custkey', 'nationkey'], partitioned_by=array['ds'], \n" + + " format = 'DWRF' ) AS \n" + + "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); + + queryRunner.execute("CREATE TABLE test_join_order5 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey', 'orderkey'], \n" + + " sorted_by = ARRAY['custkey', 'orderkey'], partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM orders LIMIT 1000"); + + // merge join can be enabled + assertPlan( + mergeJoinEnabled(), + "select * from test_join_customer5 join test_join_order5 on test_join_customer5.custkey = test_join_order5.custkey and test_join_customer5.nationkey = test_join_order5.orderkey", + joinPlan("test_join_customer5", "test_join_order5", ImmutableList.of("custkey", "nationkey"), ImmutableList.of("custkey", "orderkey"), true)); + + // join order doesn't matter + assertPlan( + mergeJoinEnabled(), + "select * from test_join_customer5 join test_join_order5 on test_join_customer5.nationkey = test_join_order5.orderkey and test_join_customer5.custkey = test_join_order5.custkey", + joinPlan("test_join_customer5", "test_join_order5", ImmutableList.of("nationkey", "custkey"), ImmutableList.of("orderkey", "custkey"), true)); + + // left and right side join columns pairs don't match order property pairing + assertPlan( + mergeJoinEnabled(), + "select * from test_join_customer5 join test_join_order5 on test_join_customer5.custkey = test_join_order5.orderkey and test_join_customer5.nationkey = test_join_order5.custkey", + joinPlan("test_join_customer5", "test_join_order5", ImmutableList.of("custkey", "nationkey"), ImmutableList.of("orderkey", "custkey"), false)); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_join_customer5"); + queryRunner.execute("DROP TABLE IF EXISTS test_join_order5"); + } + } + + @Test + public void testOrderPropertyMatch() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_join_customer_order_property WITH ( \n" + " bucket_count = 4, bucketed_by = ARRAY['custkey', 'phone'], \n" + " sorted_by = ARRAY['custkey', 'phone'], partitioned_by=array['ds'], \n" + " format = 'DWRF' ) AS \n" + "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); - queryRunner.execute("CREATE TABLE test_join_order5 WITH ( \n" + + queryRunner.execute("CREATE TABLE test_join_order_order_property WITH ( \n" + " bucket_count = 4, bucketed_by = ARRAY['custkey', 'clerk'], \n" + - " sorted_by = ARRAY['custkey', 'clerk'], partitioned_by=array['ds']) AS \n" + + " sorted_by = ARRAY['custkey DESC', 'clerk'], partitioned_by=array['ds']) AS \n" + "SELECT *, '2021-07-11' as ds FROM orders LIMIT 1000"); // merge join can't be enabled assertPlan( mergeJoinEnabled(), - "select * from test_join_customer5 join test_join_order5 on test_join_customer5.custkey = test_join_order5.custkey and test_join_customer5.phone = test_join_order5.clerk", - joinPlan("test_join_customer5", "test_join_order5", ImmutableList.of("custkey", "phone"), ImmutableList.of("custkey", "clerk"), true)); + "select * from test_join_customer_order_property join test_join_order_order_property on test_join_customer_order_property.custkey = test_join_order_order_property.custkey and test_join_customer_order_property.phone = test_join_order_order_property.clerk", + joinPlan("test_join_customer_order_property", "test_join_order_order_property", ImmutableList.of("custkey", "phone"), ImmutableList.of("custkey", "clerk"), false)); } finally { - queryRunner.execute("DROP TABLE IF EXISTS test_join_customer5"); - queryRunner.execute("DROP TABLE IF EXISTS test_join_order5"); + queryRunner.execute("DROP TABLE IF EXISTS test_join_customer_order_property"); + queryRunner.execute("DROP TABLE IF EXISTS test_join_order_order_property"); + } + } + + @Test + public void testMultiplePartitions() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_join_customer_multi_partitions WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.customer LIMIT 1000"); + queryRunner.execute("INSERT INTO test_join_customer_multi_partitions \n" + + "SELECT *, '2021-07-12' as ds FROM tpch.sf1.customer LIMIT 1000"); + + queryRunner.execute("CREATE TABLE test_join_order_multi_partitions WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.\"orders\" LIMIT 1000"); + queryRunner.execute("INSERT INTO test_join_order_multi_partitions \n" + + "SELECT *, '2021-07-12' as ds FROM tpch.sf1.orders LIMIT 1000"); + + // When partition key doesn't not appear in join keys and we query multiple partitions, we can't enable merge join + assertPlan( + mergeJoinEnabled(), + "select * from test_join_customer_multi_partitions join test_join_order_multi_partitions on test_join_customer_multi_partitions.custkey = test_join_order_multi_partitions.custkey", + joinPlan("test_join_customer_multi_partitions", "test_join_order_multi_partitions", ImmutableList.of("custkey"), ImmutableList.of("custkey"), false)); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_join_customer_multi_partitions"); + queryRunner.execute("DROP TABLE IF EXISTS test_join_order_multi_partitions"); } } + private Session mergeJoinEnabled() { return Session.builder(getQueryRunner().getDefaultSession()) .setSystemProperty(PREFER_MERGE_JOIN, "true") + .setSystemProperty(GROUPED_EXECUTION, "true") + .setCatalogSessionProperty(HIVE_CATALOG, ORDER_BASED_EXECUTION_ENABLED, "true") + .build(); + } + + private Session missGroupedExecution() + { + return Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(PREFER_MERGE_JOIN, "true") + .setSystemProperty(GROUPED_EXECUTION, "false") .setCatalogSessionProperty(HIVE_CATALOG, ORDER_BASED_EXECUTION_ENABLED, "true") .build(); } + private Session missOrderBasedExecution() + { + return Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(PREFER_MERGE_JOIN, "true") + .setSystemProperty(GROUPED_EXECUTION, "true") + .setCatalogSessionProperty(HIVE_CATALOG, ORDER_BASED_EXECUTION_ENABLED, "false") + .build(); + } + private PlanMatchPattern joinPlan(String leftTableName, String rightTableName, List leftJoinKeys, List rightJoinKeys, boolean mergeJoinEnabled) { int suffix1 = 0; diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java index db7221a3d5535..8a1c8c2a11286 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java @@ -59,6 +59,7 @@ import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode; import com.facebook.presto.sql.planner.plan.InternalPlanVisitor; import com.facebook.presto.sql.planner.plan.JoinNode; +import com.facebook.presto.sql.planner.plan.MergeJoinNode; import com.facebook.presto.sql.planner.plan.MetadataDeleteNode; import com.facebook.presto.sql.planner.plan.OutputNode; import com.facebook.presto.sql.planner.plan.PlanFragmentId; @@ -1082,6 +1083,41 @@ public GroupedExecutionProperties visitJoin(JoinNode node, Void context) } } + @Override + public GroupedExecutionProperties visitMergeJoin(MergeJoinNode node, Void context) + { + GroupedExecutionProperties left = node.getLeft().accept(this, null); + GroupedExecutionProperties right = node.getRight().accept(this, null); + + if (!groupedExecutionEnabled) { + // This is possible when the optimizers is invoked with `forceSingleNode` set to true. + return GroupedExecutionProperties.notCapable(); + } + + if (left.currentNodeCapable && right.currentNodeCapable) { + checkState(left.totalLifespans == right.totalLifespans, format("Mismatched number of lifespans on left(%s) and right(%s) side of join", left.totalLifespans, right.totalLifespans)); + return new GroupedExecutionProperties( + true, + true, + ImmutableList.builder() + .addAll(left.capableTableScanNodes) + .addAll(right.capableTableScanNodes) + .build(), + left.totalLifespans, + left.recoveryEligible && right.recoveryEligible); + } + // right.subTreeUseful && !left.currentNodeCapable: + // It's not particularly helpful to do grouped execution on the right side + // because the benefit is likely cancelled out due to required buffering for hash build. + // In theory, it could still be helpful (e.g. when the underlying aggregation's intermediate group state maybe larger than aggregation output). + // However, this is not currently implemented. JoinBridgeManager need to support such a lifecycle. + // !right.currentNodeCapable: + // The build/right side needs to buffer fully for this JOIN, but the probe/left side will still stream through. + // As a result, there is no reason to change currentNodeCapable or subTreeUseful to false. + // + return left; + } + @Override public GroupedExecutionProperties visitAggregation(AggregationNode node, Void context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinOptimizer.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinOptimizer.java index 6fd836a0ab5f8..5f33a7073b671 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinOptimizer.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinOptimizer.java @@ -14,7 +14,10 @@ package com.facebook.presto.sql.planner.optimizations; import com.facebook.presto.Session; +import com.facebook.presto.common.block.SortOrder; import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.spi.LocalProperty; +import com.facebook.presto.spi.SortingProperty; import com.facebook.presto.spi.WarningCollector; import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spi.plan.PlanNodeIdAllocator; @@ -27,9 +30,12 @@ import com.facebook.presto.sql.planner.plan.SimplePlanRewriter; import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.preferMergeJoin; -import static com.facebook.presto.common.block.SortOrder.ASC_NULLS_FIRST; +import static com.facebook.presto.sql.planner.optimizations.StreamPropertyDerivations.StreamProperties.StreamDistribution.SINGLE; import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Objects.requireNonNull; @@ -53,7 +59,7 @@ public PlanNode optimize(PlanNode plan, Session session, TypeProvider type, Plan requireNonNull(variableAllocator, "variableAllocator is null"); requireNonNull(idAllocator, "idAllocator is null"); - if (preferMergeJoin(session)) { + if (preferMergeJoin(session) && isGroupedExecutionEnabled(session)) { return SimplePlanRewriter.rewriteWith(new MergeJoinOptimizer.Rewriter(variableAllocator, idAllocator, metadata, session), plan, null); } return plan; @@ -91,7 +97,8 @@ public PlanNode visitJoin(JoinNode node, RewriteContext context) // 2. If not, we don't optimize - if (isMergeJoinEligible(node.getLeft(), node.getRight(), node)) { + Optional orderPropertyPair = getMergeJoinOrderPropertyPair(node.getLeft(), node.getRight(), node); + if (orderPropertyPair.isPresent()) { return new MergeJoinNode( node.getSourceLocation(), node.getId(), @@ -99,6 +106,8 @@ public PlanNode visitJoin(JoinNode node, RewriteContext context) node.getLeft(), node.getRight(), node.getCriteria(), + orderPropertyPair.get().getLeftSortingProperties(), + orderPropertyPair.get().getRightSortingProperties(), node.getOutputVariables(), node.getFilter(), node.getLeftHashVariable(), @@ -107,20 +116,135 @@ public PlanNode visitJoin(JoinNode node, RewriteContext context) return node; } - private boolean isMergeJoinEligible(PlanNode left, PlanNode right, JoinNode node) + private Optional getMergeJoinOrderPropertyPair(PlanNode left, PlanNode right, JoinNode node) { // Acquire data properties for both left and right side StreamPropertyDerivations.StreamProperties leftProperties = StreamPropertyDerivations.derivePropertiesRecursively(left, metadata, session, types, parser); StreamPropertyDerivations.StreamProperties rightProperties = StreamPropertyDerivations.derivePropertiesRecursively(right, metadata, session, types, parser); - List leftJoinColumns = node.getCriteria().stream().map(JoinNode.EquiJoinClause::getLeft).collect(toImmutableList()); - List buildHashVariables = node.getCriteria().stream() - .map(JoinNode.EquiJoinClause::getRight) - .collect(toImmutableList()); + List rightJoinColumns = node.getCriteria().stream().map(JoinNode.EquiJoinClause::getRight).collect(toImmutableList()); + List> leftSortingProperties = leftProperties.getLocalProperties().stream().filter(SortingProperty.class::isInstance).collect(Collectors.toList()); + List> rightSortingProperties = rightProperties.getLocalProperties().stream().filter(SortingProperty.class::isInstance).collect(Collectors.toList()); + + // Check if the left side and right side's partitioning columns (bucketed-by columns) are a subset of join columns + // B = subset (J) + if (!verifyStreamProperties(leftProperties, leftJoinColumns) || !verifyStreamProperties(rightProperties, rightJoinColumns)) { + return Optional.empty(); + } + + // Check if the join columns has the same elements as the prefix of the sorted-by keys for both left and right sides + // J = Set (prefix(S)) + if (!verifyLocalProperties(leftSortingProperties, leftJoinColumns) || !verifyLocalProperties(rightSortingProperties, rightJoinColumns)) { + return Optional.empty(); + } + + // Check If left and right join columns pairs match the sorting property pairs (positions match and ASC/DESC match) + return orderPropertyPairingMatch(leftSortingProperties, leftJoinColumns, rightSortingProperties, rightJoinColumns); + } + + private boolean verifyStreamProperties(StreamPropertyDerivations.StreamProperties streamProperties, List joinColumns) + { + if (streamProperties.getDistribution() == SINGLE) { + return true; + } + else if (!streamProperties.getPartitioningColumns().isPresent()) { + return false; + } + else { + List partitioningColumns = streamProperties.getPartitioningColumns().get(); + return partitioningColumns.size() <= joinColumns.size() && joinColumns.containsAll(partitioningColumns); + } + } + + private boolean verifyLocalProperties(List> sortingProperties, List joinColumns) + { + // Logic in LocalProperties.match(sortingProperties, joinColumns) + // 1. Extract the longest prefix of sortingProperties to a set that is a subset of joinColumns + // 2. Iterate join columns and add the elements that's not in the set to the result + // Result would be a List of one element: Optional, GroupingProperty would contain one/multiple elements from step 2 + // Eg: + // [A, B] [(B, A)] -> List.of(Optional.empty()) + // [A, B] [B] -> List.of(Optional.of(GroupingProperty(B))) + // [A, B] [A] -> List.of(Optional.empty()) + // [A, B] [(A, C)] -> List.of(Optional.of(GroupingProperty(C))) + // [A, B] [(D, A, C)] -> List.of(Optional.of(GroupingProperty(D, C))) + + // !isPresent() indicates the property was satisfied completely + return !LocalProperties.match(sortingProperties, LocalProperties.grouped(joinColumns)).get(0).isPresent(); + } + + private Optional orderPropertyPairingMatch(List> leftSortingProperties, List leftJoinColumns, List> rightSortingProperties, List rightJoinColumns) + { + // 1. Check if join columns pairs match the sorting property pairs + // For example: Table A is sorted by [A, B], Table B is sorted by [C, D] + // Join criteria: + // A = C, B = D would work + // A = D, B = C wouldn't work + // B = D, A = C would work + + // 2. Check sorting property ASC/DESC matching + // For example: Table A is sorted by [A DESC, B], Table B is sorted by [C ASC, D] + // Join criteria: + // A = C, B = D wouldn't work + int joinColumnSize = leftJoinColumns.size(); + for (int i = 0; i < joinColumnSize; i++) { + VariableReferenceExpression leftJoinColumn = leftJoinColumns.get(i); + VariableReferenceExpression rightJoinColumn = rightJoinColumns.get(i); + int leftPosition = 0; + SortingProperty leftSortingProperty = null; + for (; leftPosition < leftSortingProperties.size(); leftPosition++) { + leftSortingProperty = (SortingProperty) leftSortingProperties.get(leftPosition); + if (leftSortingProperty.getColumn().equals(leftJoinColumn)) { + break; + } + } + int rightPosition = 0; + SortingProperty rightSortingProperty = null; + for (; rightPosition < rightSortingProperties.size(); rightPosition++) { + rightSortingProperty = (SortingProperty) rightSortingProperties.get(rightPosition); + if (rightSortingProperty.getColumn().equals(rightJoinColumn)) { + break; + } + } + + if (leftPosition != rightPosition + || !leftSortingProperty.getOrder().equals(rightSortingProperty.getOrder()) + || leftSortingProperty.isOrderSensitive() != rightSortingProperty.isOrderSensitive() ) { + return Optional.empty(); + } + } + return Optional.of(new SortingPropertyPair(leftSortingProperties.subList(0, leftJoinColumns.size()), rightSortingProperties.subList(0, rightJoinColumns.size()))); + } + } + + private class SortingPropertyPair + { + private List> leftSortingProperties; + private List> rightSortingProperties; - // Check if the left side and right side are both ordered by the join columns - return !LocalProperties.match(rightProperties.getLocalProperties(), LocalProperties.sorted(buildHashVariables, ASC_NULLS_FIRST)).get(0).isPresent() && - !LocalProperties.match(leftProperties.getLocalProperties(), LocalProperties.sorted(leftJoinColumns, ASC_NULLS_FIRST)).get(0).isPresent(); + public SortingPropertyPair(List> leftSortingProperties, List> rightSortingProperties) { + this.leftSortingProperties = leftSortingProperties.stream().map(property -> (SortingProperty) property).collect(Collectors.toList()); + this.rightSortingProperties = rightSortingProperties.stream().map(property -> (SortingProperty) property).collect(Collectors.toList()); + } + + public List> getLeftSortingProperties() + { + return leftSortingProperties; + } + + public void setLeftSortingProperties(List> leftSortingProperties) + { + this.leftSortingProperties = leftSortingProperties; + } + + public List> getRightSortingProperties() + { + return rightSortingProperties; + } + + public void setRightSortingProperties(List> rightSortingProperties) + { + this.rightSortingProperties = rightSortingProperties; } } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/MergeJoinNode.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/MergeJoinNode.java index e1e41e7b37070..da9430fa6bd83 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/MergeJoinNode.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/MergeJoinNode.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.sql.planner.plan; +import com.facebook.presto.spi.SortingProperty; import com.facebook.presto.spi.SourceLocation; import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spi.plan.PlanNodeId; @@ -38,6 +39,8 @@ public class MergeJoinNode private final PlanNode left; private final PlanNode right; private final List criteria; + private final List> leftSortingProperties; + private final List> rightSortingProperties; private final Optional filter; private final List outputVariables; private final Optional leftHashVariable; @@ -47,20 +50,24 @@ public class MergeJoinNode public MergeJoinNode( Optional sourceLocation, @JsonProperty ("id") PlanNodeId id, - @JsonProperty("type") JoinNode.Type type, - @JsonProperty("left") PlanNode left, - @JsonProperty("right") PlanNode right, - @JsonProperty("criteria") List criteria, - @JsonProperty("outputVariables") List outputVariables, - @JsonProperty("filter") Optional filter, - @JsonProperty("leftHashVariable") Optional leftHashVariable, - @JsonProperty("rightHashVariable") Optional rightHashVariable) + @JsonProperty ("type") JoinNode.Type type, + @JsonProperty ("left") PlanNode left, + @JsonProperty ("right") PlanNode right, + @JsonProperty ("criteria") List criteria, + @JsonProperty ("leftSortingProperties") List> leftSortingProperties, + @JsonProperty ("rightSortingProperties") List> rightSortingProperties, + @JsonProperty ("outputVariables") List outputVariables, + @JsonProperty ("filter") Optional filter, + @JsonProperty ("leftHashVariable") Optional leftHashVariable, + @JsonProperty ("rightHashVariable") Optional rightHashVariable) { super(sourceLocation, id); this.type = requireNonNull(type, "type is null"); this.left = requireNonNull(left, "left is null"); this.right = requireNonNull(right, "right is null"); this.criteria = ImmutableList.copyOf(requireNonNull(criteria, "criteria is null")); + this.leftSortingProperties = ImmutableList.copyOf(requireNonNull(leftSortingProperties, "leftOrderProperty is null")); + this.rightSortingProperties = ImmutableList.copyOf(requireNonNull(rightSortingProperties, "rightOrderProperty is null")); this.outputVariables = ImmutableList.copyOf(requireNonNull(outputVariables, "outputVariables is null")); this.filter = requireNonNull(filter, "filter is null"); this.leftHashVariable = requireNonNull(leftHashVariable, "leftHashVariable is null"); @@ -91,6 +98,18 @@ public List getCriteria() return criteria; } + @JsonProperty + public List> getLeftSortingProperties() + { + return leftSortingProperties; + } + + @JsonProperty + public List> getRightSortingProperties() + { + return rightSortingProperties; + } + @Override @JsonProperty public List getOutputVariables() @@ -126,7 +145,19 @@ public List getSources() public PlanNode replaceChildren(List newChildren) { checkArgument(newChildren.size() == 2, "expected newChildren to contain 2 nodes"); - return new MergeJoinNode(getSourceLocation(), getId(), type, newChildren.get(0), newChildren.get(1), criteria, outputVariables, filter, leftHashVariable, rightHashVariable); + return new MergeJoinNode( + getSourceLocation(), + getId(), + type, + newChildren.get(0), + newChildren.get(1), + criteria, + leftSortingProperties, + rightSortingProperties, + outputVariables, + filter, + leftHashVariable, + rightHashVariable); } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java index abb9fdf5dd174..079657bf488b6 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java @@ -581,7 +581,13 @@ public Void visitMergeJoin(MergeJoinNode node, Void context) addNode(node, "MergeJoin", - format("[type: %s], [%s]%s", node.getType().getJoinLabel(), Joiner.on(" AND ").join(joinExpressions), formatHash(node.getLeftHashVariable(), node.getRightHashVariable()))); + format( + "[type: %s], [%s]%s, [left sorting properties: %s], [right sorting properties: %s]", + node.getType().getJoinLabel(), + Joiner.on(" AND ").join(joinExpressions), + formatHash(node.getLeftHashVariable(), node.getRightHashVariable()), + node.getLeftSortingProperties(), + node.getRightSortingProperties())); node.getLeft().accept(this, context); node.getRight().accept(this, context); return null; diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/assertions/MergeJoinMatcher.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/assertions/MergeJoinMatcher.java index 8227846c7c1f4..37a6e097adcf4 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/assertions/MergeJoinMatcher.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/assertions/MergeJoinMatcher.java @@ -111,6 +111,8 @@ public MatchResult detailMatches(PlanNode node, StatsProvider stats, Session ses return NO_MATCH; } + // todo: compare sortingProperties + return MatchResult.match(); }