Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] colocate join need consider column equivalent conduction(backport #13344) #13546

Merged
merged 3 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.starrocks.catalog.ColocateTableIndex;
import com.starrocks.common.Pair;
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.optimizer.base.ColumnRefSet;
import com.starrocks.sql.optimizer.base.DistributionProperty;
import com.starrocks.sql.optimizer.base.DistributionSpec;
import com.starrocks.sql.optimizer.base.HashDistributionDesc;
Expand All @@ -21,6 +24,7 @@
import com.starrocks.sql.optimizer.task.TaskContext;

import java.util.List;
import java.util.Set;

public class ChildOutputPropertyGuarantor extends PropertyDeriverBase<Void, ExpressionContext> {
private PhysicalPropertySet requirements;
Expand Down Expand Up @@ -96,16 +100,27 @@ public boolean canColocateJoin(HashDistributionSpec leftLocalDistributionSpec,
Preconditions.checkState(leftLocalDistributionDesc.getColumns().size() ==
rightLocalDistributionDesc.getColumns().size());
}
// check orders of predicate columns is right
// check predicate columns is satisfy bucket hash columns

// The order of equivalence predicates(shuffle columns are derived from them) is
// meaningless, hence it is correct to use a set to save these shuffle pairs. According
// to the distribution column information of the left and right children, we can build
// distribution pairs. We can use colocate join is judged by whether all the distribution
// pairs are exist in the equivalent predicates set.
Set<Pair<Integer, Integer>> shufflePairs = Sets.newHashSet();
for (int i = 0; i < leftShuffleColumns.size(); i++) {
shufflePairs.add(Pair.create(leftShuffleColumns.get(i), rightShuffleColumns.get(i)));
}

for (int i = 0; i < leftLocalDistributionDesc.getColumns().size(); ++i) {
int leftScanColumnId = leftLocalDistributionDesc.getColumns().get(i);
int leftIndex = leftShuffleColumns.indexOf(leftScanColumnId);
ColumnRefSet leftEquivalentCols = leftLocalDistributionSpec.getPropertyInfo()
.getEquivalentColumns(leftScanColumnId);

int rightScanColumnId = rightLocalDistributionDesc.getColumns().get(i);
int rightIndex = rightShuffleColumns.indexOf(rightScanColumnId);
ColumnRefSet rightEquivalentCols = rightLocalDistributionSpec.getPropertyInfo()
.getEquivalentColumns(rightScanColumnId);

if (leftIndex != rightIndex) {
if (!isDistributionPairExist(shufflePairs, leftEquivalentCols, rightEquivalentCols)) {
return false;
}
}
Expand Down Expand Up @@ -325,4 +340,18 @@ public Void visitPhysicalJoin(PhysicalJoinOperator node, ExpressionContext conte

return visitOperator(node, context);
}

private boolean isDistributionPairExist(Set<Pair<Integer, Integer>> shufflePairs,
ColumnRefSet leftEquivalentCols,
ColumnRefSet rightEquivalentCols) {
for (int leftCol : leftEquivalentCols.getColumnIds()) {
for (int rightCol : rightEquivalentCols.getColumnIds()) {
Pair<Integer, Integer> distributionPair = Pair.create(leftCol, rightCol);
if (shufflePairs.contains(distributionPair)) {
return true;
}
}
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Inc.

package com.starrocks.sql.plan;

import com.google.common.collect.Lists;
import com.starrocks.common.FeConstants;
import org.apache.commons.lang.StringUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.List;

public class ColocateJoinTest extends PlanTestBase {

@BeforeClass
public static void beforeClass() throws Exception {
PlanTestBase.beforeClass();
FeConstants.runningUnitTest = true;
starRocksAssert.withTable("CREATE TABLE `colocate_t2_1` (\n" +
" `v7` bigint NULL COMMENT \"\",\n" +
" `v8` bigint NULL COMMENT \"\",\n" +
" `v9` bigint NULL\n" +
") ENGINE=OLAP\n" +
"DUPLICATE KEY(`v7`, `v8`, v9)\n" +
"DISTRIBUTED BY HASH(`v7`) BUCKETS 3\n" +
"PROPERTIES (\n" +
"\"replication_num\" = \"1\",\n" +
"\"in_memory\" = \"false\",\n" +
"\"storage_format\" = \"DEFAULT\",\n" +
"\"colocate_with\" = \"colocate_group_1\"" +
");");
}

@Test
public void testColocateJoinOnce() throws Exception {
List<String> sqls = Lists.newArrayList();
sqls.add("select * from colocate_t0 join colocate_t1 on v1 = v5 and v1 = v4");
sqls.add("select * from colocate_t0 join colocate_t1 on v2 = v4 and v1 = v4");
sqls.add("select * from colocate_t0 join colocate_t1 on v1 + v2 = v4 + v5 and v1 = v4 + 1 and v1 = v4");
sqls.add("select * from colocate_t0, colocate_t1 where v1 = v5 and v1 = v4");
sqls.add("select * from colocate_t0, colocate_t1 where v2 = v4 and v1 = v4");
sqls.add("select * from colocate_t0, colocate_t1 where v1 + v2 = v4 + v5 and v1 = v4 + 1 and v1 = v4");

sqls.add("select * from colocate_t0, colocate_t1, colocate_t2_1 where v1 = v5 and v5 = v7");
for (String sql : sqls) {
String plan = getFragmentPlan(sql);
int count = StringUtils.countMatches(plan, "INNER JOIN (COLOCATE)");
Assert.assertEquals(plan, 1, count);
}

}

@Test
public void testColocateJoinTwice() throws Exception {
List<String> sqls = Lists.newArrayList();
sqls.add("select * from colocate_t0 join colocate_t1 on v1 = v4 join colocate_t2_1 on v4 = v7");
sqls.add("select * from colocate_t0 join colocate_t1 on v1 = v5 and v1 = v4 join colocate_t2_1 on v5 = v7 and v7 = v2");
sqls.add("select * from colocate_t0 join colocate_t1 on v1 = v5 join colocate_t2_1 on v1 = v4 and v1 = v7");


sqls.add("select * from colocate_t0, colocate_t1, colocate_t2_1 where v1 = v4 and v4 = v7");
sqls.add("select * from colocate_t0, colocate_t1, colocate_t2_1 where v1 = v5 and v1 = v4 and v5 = v7 and v7 = v2");
sqls.add("select * from colocate_t0, colocate_t1, colocate_t2_1 where v1 = v5 and v1 = v4 and v1 = v7");
for (String sql : sqls) {
String plan = getFragmentPlan(sql);
int count = StringUtils.countMatches(plan, "INNER JOIN (COLOCATE)");
Assert.assertEquals(plan, 2, count);
}

}
}
84 changes: 18 additions & 66 deletions fe/fe-core/src/test/resources/sql/tpch/q2.sql
Original file line number Diff line number Diff line change
@@ -1,48 +1,3 @@
[sql]
select
s_acctbal,
s_name,
n_name,
p_partkey,
p_mfgr,
s_address,
s_phone,
s_comment
from
part,
supplier,
partsupp,
nation,
region
where
p_partkey = ps_partkey
and s_suppkey = ps_suppkey
and p_size = 12
and p_type like '%COPPER'
and s_nationkey = n_nationkey
and n_regionkey = r_regionkey
and r_name = 'AMERICA'
and ps_supplycost = (
select
min(ps_supplycost)
from
partsupp,
supplier,
nation,
region
where
p_partkey = ps_partkey
and s_suppkey = ps_suppkey
and s_nationkey = n_nationkey
and n_regionkey = r_regionkey
and r_name = 'AMERICA'
)
order by
s_acctbal desc,
n_name,
s_name,
p_partkey limit 100;
[result]
TOP-N (order by [[16: S_ACCTBAL DESC NULLS LAST, 26: N_NAME ASC NULLS FIRST, 12: S_NAME ASC NULLS FIRST, 1: P_PARTKEY ASC NULLS FIRST]])
TOP-N (order by [[16: S_ACCTBAL DESC NULLS LAST, 26: N_NAME ASC NULLS FIRST, 12: S_NAME ASC NULLS FIRST, 1: P_PARTKEY ASC NULLS FIRST]])
INNER JOIN (join-predicate [30: R_REGIONKEY = 27: N_REGIONKEY] post-join-predicate [null])
Expand All @@ -53,24 +8,21 @@ TOP-N (order by [[16: S_ACCTBAL DESC NULLS LAST, 26: N_NAME ASC NULLS FIRST, 12:
EXCHANGE BROADCAST
INNER JOIN (join-predicate [11: S_SUPPKEY = 20: PS_SUPPKEY] post-join-predicate [null])
SCAN (columns[17: S_COMMENT, 11: S_SUPPKEY, 12: S_NAME, 13: S_ADDRESS, 14: S_NATIONKEY, 15: S_PHONE, 16: S_ACCTBAL] predicate[null])
EXCHANGE BROADCAST
INNER JOIN (join-predicate [22: PS_SUPPLYCOST = 57: min AND 19: PS_PARTKEY = 1: P_PARTKEY] post-join-predicate [null])
SCAN (columns[19: PS_PARTKEY, 20: PS_SUPPKEY, 22: PS_SUPPLYCOST] predicate[null])
EXCHANGE SHUFFLE[1]
INNER JOIN (join-predicate [1: P_PARTKEY = 34: PS_PARTKEY] post-join-predicate [null])
SCAN (columns[1: P_PARTKEY, 3: P_MFGR, 5: P_TYPE, 6: P_SIZE] predicate[6: P_SIZE = 12 AND 5: P_TYPE LIKE %COPPER])
EXCHANGE SHUFFLE[34]
AGGREGATE ([GLOBAL] aggregate [{57: min=min(57: min)}] group by [[34: PS_PARTKEY]] having [null]
AGGREGATE ([LOCAL] aggregate [{57: min=min(37: PS_SUPPLYCOST)}] group by [[34: PS_PARTKEY]] having [null]
INNER JOIN (join-predicate [35: PS_SUPPKEY = 40: S_SUPPKEY] post-join-predicate [null])
SCAN (columns[34: PS_PARTKEY, 35: PS_SUPPKEY, 37: PS_SUPPLYCOST] predicate[null])
EXCHANGE BROADCAST
INNER JOIN (join-predicate [43: S_NATIONKEY = 48: N_NATIONKEY] post-join-predicate [null])
SCAN (columns[40: S_SUPPKEY, 43: S_NATIONKEY] predicate[null])
EXCHANGE BROADCAST
INNER JOIN (join-predicate [50: N_REGIONKEY = 53: R_REGIONKEY] post-join-predicate [null])
SCAN (columns[50: N_REGIONKEY, 48: N_NATIONKEY] predicate[null])
EXCHANGE BROADCAST
SCAN (columns[53: R_REGIONKEY, 54: R_NAME] predicate[54: R_NAME = AMERICA])
[end]

EXCHANGE SHUFFLE[20]
INNER JOIN (join-predicate [57: min = 22: PS_SUPPLYCOST AND 34: PS_PARTKEY = 1: P_PARTKEY] post-join-predicate [null])
AGGREGATE ([GLOBAL] aggregate [{57: min=min(57: min)}] group by [[34: PS_PARTKEY]] having [null]
AGGREGATE ([LOCAL] aggregate [{57: min=min(37: PS_SUPPLYCOST)}] group by [[34: PS_PARTKEY]] having [null]
INNER JOIN (join-predicate [35: PS_SUPPKEY = 40: S_SUPPKEY] post-join-predicate [null])
SCAN (columns[34: PS_PARTKEY, 35: PS_SUPPKEY, 37: PS_SUPPLYCOST] predicate[null])
EXCHANGE BROADCAST
INNER JOIN (join-predicate [43: S_NATIONKEY = 48: N_NATIONKEY] post-join-predicate [null])
SCAN (columns[40: S_SUPPKEY, 43: S_NATIONKEY] predicate[null])
EXCHANGE BROADCAST
INNER JOIN (join-predicate [50: N_REGIONKEY = 53: R_REGIONKEY] post-join-predicate [null])
SCAN (columns[50: N_REGIONKEY, 48: N_NATIONKEY] predicate[null])
EXCHANGE BROADCAST
SCAN (columns[53: R_REGIONKEY, 54: R_NAME] predicate[54: R_NAME = AMERICA])
INNER JOIN (join-predicate [19: PS_PARTKEY = 1: P_PARTKEY] post-join-predicate [null])
SCAN (columns[19: PS_PARTKEY, 20: PS_SUPPKEY, 22: PS_SUPPLYCOST] predicate[null])
EXCHANGE SHUFFLE[1]
SCAN (columns[1: P_PARTKEY, 3: P_MFGR, 5: P_TYPE, 6: P_SIZE] predicate[6: P_SIZE = 12 AND 5: P_TYPE LIKE %COPPER])
Loading