Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Optional;

import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION;
import static com.facebook.presto.SystemSessionProperties.PREFER_MERGE_JOIN;
import static com.facebook.presto.hive.HiveQueryRunner.HIVE_CATALOG;
import static com.facebook.presto.hive.HiveSessionProperties.ORDER_BASED_EXECUTION_ENABLED;
Expand Down Expand Up @@ -51,6 +52,8 @@ protected QueryRunner createQueryRunner()
Optional.empty());
}

// todo add sorting properties for verification

@Test
public void testSessionProperty()
{
Expand All @@ -67,8 +70,19 @@ public void testSessionProperty()
" sorted_by = ARRAY['custkey'], partitioned_by=array['ds']) AS \n" +
"SELECT *, '2021-07-11' as ds FROM tpch.sf1.\"orders\" LIMIT 1000");

// By default, we don't enable merge join
// By default, we can't enable merge join
assertPlan(
"select * from test_join_customer join test_join_order on test_join_customer.custkey = test_join_order.custkey",
joinPlan("test_join_customer", "test_join_order", ImmutableList.of("custkey"), ImmutableList.of("custkey"), false));

// when we miss session property, we can't enable merge join
assertPlan(
missGroupedExecution(),
"select * from test_join_customer join test_join_order on test_join_customer.custkey = test_join_order.custkey",
joinPlan("test_join_customer", "test_join_order", ImmutableList.of("custkey"), ImmutableList.of("custkey"), false));

assertPlan(
missOrderBasedExecution(),
"select * from test_join_customer join test_join_order on test_join_customer.custkey = test_join_order.custkey",
joinPlan("test_join_customer", "test_join_order", ImmutableList.of("custkey"), ImmutableList.of("custkey"), false));

Expand Down Expand Up @@ -175,36 +189,129 @@ public void testMultipleJoinKeys()

try {
queryRunner.execute("CREATE TABLE test_join_customer5 WITH ( \n" +
" bucket_count = 4, bucketed_by = ARRAY['custkey', 'nationkey'], \n" +
" sorted_by = ARRAY['custkey', 'nationkey'], partitioned_by=array['ds'], \n" +
" format = 'DWRF' ) AS \n" +
"SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n");

queryRunner.execute("CREATE TABLE test_join_order5 WITH ( \n" +
" bucket_count = 4, bucketed_by = ARRAY['custkey', 'orderkey'], \n" +
" sorted_by = ARRAY['custkey', 'orderkey'], partitioned_by=array['ds']) AS \n" +
"SELECT *, '2021-07-11' as ds FROM orders LIMIT 1000");

// merge join can be enabled
assertPlan(
mergeJoinEnabled(),
"select * from test_join_customer5 join test_join_order5 on test_join_customer5.custkey = test_join_order5.custkey and test_join_customer5.nationkey = test_join_order5.orderkey",
joinPlan("test_join_customer5", "test_join_order5", ImmutableList.of("custkey", "nationkey"), ImmutableList.of("custkey", "orderkey"), true));

// join order doesn't matter
assertPlan(
mergeJoinEnabled(),
"select * from test_join_customer5 join test_join_order5 on test_join_customer5.nationkey = test_join_order5.orderkey and test_join_customer5.custkey = test_join_order5.custkey",
joinPlan("test_join_customer5", "test_join_order5", ImmutableList.of("nationkey", "custkey"), ImmutableList.of("orderkey", "custkey"), true));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be supported at execution time. Currently we don't have a preSortedColumns to indicate the sort by columns. We are using the order of the join keys as the order of the sorted keys at the execution time. This plan means the left table is order by nationkey, custkey which is incorrect.

We need to either add a PreSortedColumns field or disable this case(join order doesn't match sort by column order) for now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I made this comment in the test file, this is actually about the new logic.

Copy link
Copy Markdown
Collaborator Author

@kewang1024 kewang1024 Apr 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sorry, I only added the new verification logic and didn't add the PreSortedColumns because I wanted to discuss with you about the official format in our 1-1

But just add two fields in the PR as initial plan, let me know what you think


// left and right side join columns pairs don't match order property pairing
assertPlan(
mergeJoinEnabled(),
"select * from test_join_customer5 join test_join_order5 on test_join_customer5.custkey = test_join_order5.orderkey and test_join_customer5.nationkey = test_join_order5.custkey",
joinPlan("test_join_customer5", "test_join_order5", ImmutableList.of("custkey", "nationkey"), ImmutableList.of("orderkey", "custkey"), false));
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_join_customer5");
queryRunner.execute("DROP TABLE IF EXISTS test_join_order5");
}
}

@Test
public void testOrderPropertyMatch()
{
QueryRunner queryRunner = getQueryRunner();

try {
queryRunner.execute("CREATE TABLE test_join_customer_order_property WITH ( \n" +
" bucket_count = 4, bucketed_by = ARRAY['custkey', 'phone'], \n" +
" sorted_by = ARRAY['custkey', 'phone'], partitioned_by=array['ds'], \n" +
" format = 'DWRF' ) AS \n" +
"SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n");

queryRunner.execute("CREATE TABLE test_join_order5 WITH ( \n" +
queryRunner.execute("CREATE TABLE test_join_order_order_property WITH ( \n" +
" bucket_count = 4, bucketed_by = ARRAY['custkey', 'clerk'], \n" +
" sorted_by = ARRAY['custkey', 'clerk'], partitioned_by=array['ds']) AS \n" +
" sorted_by = ARRAY['custkey DESC', 'clerk'], partitioned_by=array['ds']) AS \n" +
"SELECT *, '2021-07-11' as ds FROM orders LIMIT 1000");

// merge join can't be enabled
assertPlan(
mergeJoinEnabled(),
"select * from test_join_customer5 join test_join_order5 on test_join_customer5.custkey = test_join_order5.custkey and test_join_customer5.phone = test_join_order5.clerk",
joinPlan("test_join_customer5", "test_join_order5", ImmutableList.of("custkey", "phone"), ImmutableList.of("custkey", "clerk"), true));
"select * from test_join_customer_order_property join test_join_order_order_property on test_join_customer_order_property.custkey = test_join_order_order_property.custkey and test_join_customer_order_property.phone = test_join_order_order_property.clerk",
joinPlan("test_join_customer_order_property", "test_join_order_order_property", ImmutableList.of("custkey", "phone"), ImmutableList.of("custkey", "clerk"), false));
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_join_customer5");
queryRunner.execute("DROP TABLE IF EXISTS test_join_order5");
queryRunner.execute("DROP TABLE IF EXISTS test_join_customer_order_property");
queryRunner.execute("DROP TABLE IF EXISTS test_join_order_order_property");
}
}

@Test
public void testMultiplePartitions()
{
QueryRunner queryRunner = getQueryRunner();

try {
queryRunner.execute("CREATE TABLE test_join_customer_multi_partitions WITH ( \n" +
" bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" +
" sorted_by = ARRAY['custkey'], partitioned_by=array['ds']) AS \n" +
"SELECT *, '2021-07-11' as ds FROM tpch.sf1.customer LIMIT 1000");
queryRunner.execute("INSERT INTO test_join_customer_multi_partitions \n" +
"SELECT *, '2021-07-12' as ds FROM tpch.sf1.customer LIMIT 1000");

queryRunner.execute("CREATE TABLE test_join_order_multi_partitions WITH ( \n" +
" bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" +
" sorted_by = ARRAY['custkey'], partitioned_by=array['ds']) AS \n" +
"SELECT *, '2021-07-11' as ds FROM tpch.sf1.\"orders\" LIMIT 1000");
queryRunner.execute("INSERT INTO test_join_order_multi_partitions \n" +
"SELECT *, '2021-07-12' as ds FROM tpch.sf1.orders LIMIT 1000");

// When partition key doesn't not appear in join keys and we query multiple partitions, we can't enable merge join
assertPlan(
mergeJoinEnabled(),
"select * from test_join_customer_multi_partitions join test_join_order_multi_partitions on test_join_customer_multi_partitions.custkey = test_join_order_multi_partitions.custkey",
joinPlan("test_join_customer_multi_partitions", "test_join_order_multi_partitions", ImmutableList.of("custkey"), ImmutableList.of("custkey"), false));
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_join_customer_multi_partitions");
queryRunner.execute("DROP TABLE IF EXISTS test_join_order_multi_partitions");
}
}


private Session mergeJoinEnabled()
{
return Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(PREFER_MERGE_JOIN, "true")
.setSystemProperty(GROUPED_EXECUTION, "true")
.setCatalogSessionProperty(HIVE_CATALOG, ORDER_BASED_EXECUTION_ENABLED, "true")
.build();
}

private Session missGroupedExecution()
{
return Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(PREFER_MERGE_JOIN, "true")
.setSystemProperty(GROUPED_EXECUTION, "false")
.setCatalogSessionProperty(HIVE_CATALOG, ORDER_BASED_EXECUTION_ENABLED, "true")
.build();
}

private Session missOrderBasedExecution()
{
return Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(PREFER_MERGE_JOIN, "true")
.setSystemProperty(GROUPED_EXECUTION, "true")
.setCatalogSessionProperty(HIVE_CATALOG, ORDER_BASED_EXECUTION_ENABLED, "false")
.build();
}

private PlanMatchPattern joinPlan(String leftTableName, String rightTableName, List<String> leftJoinKeys, List<String> rightJoinKeys, boolean mergeJoinEnabled)
{
int suffix1 = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.sql.planner.plan.JoinNode;
import com.facebook.presto.sql.planner.plan.MergeJoinNode;
import com.facebook.presto.sql.planner.plan.MetadataDeleteNode;
import com.facebook.presto.sql.planner.plan.OutputNode;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
Expand Down Expand Up @@ -1082,6 +1083,41 @@ public GroupedExecutionProperties visitJoin(JoinNode node, Void context)
}
}

@Override
public GroupedExecutionProperties visitMergeJoin(MergeJoinNode node, Void context)
{
GroupedExecutionProperties left = node.getLeft().accept(this, null);
GroupedExecutionProperties right = node.getRight().accept(this, null);

if (!groupedExecutionEnabled) {
// This is possible when the optimizers is invoked with `forceSingleNode` set to true.
return GroupedExecutionProperties.notCapable();
}

if (left.currentNodeCapable && right.currentNodeCapable) {
checkState(left.totalLifespans == right.totalLifespans, format("Mismatched number of lifespans on left(%s) and right(%s) side of join", left.totalLifespans, right.totalLifespans));
return new GroupedExecutionProperties(
true,
true,
ImmutableList.<PlanNodeId>builder()
.addAll(left.capableTableScanNodes)
.addAll(right.capableTableScanNodes)
.build(),
left.totalLifespans,
left.recoveryEligible && right.recoveryEligible);
}
// right.subTreeUseful && !left.currentNodeCapable:
// It's not particularly helpful to do grouped execution on the right side
// because the benefit is likely cancelled out due to required buffering for hash build.
// In theory, it could still be helpful (e.g. when the underlying aggregation's intermediate group state maybe larger than aggregation output).
// However, this is not currently implemented. JoinBridgeManager need to support such a lifecycle.
// !right.currentNodeCapable:
// The build/right side needs to buffer fully for this JOIN, but the probe/left side will still stream through.
// As a result, there is no reason to change currentNodeCapable or subTreeUseful to false.
//
return left;
}

@Override
public GroupedExecutionProperties visitAggregation(AggregationNode node, Void context)
{
Expand Down
Loading