From 588dbcad50429afc2228e62b3ece0288fdbb6083 Mon Sep 17 00:00:00 2001 From: Youngwb Date: Fri, 15 Apr 2022 10:24:15 +0800 Subject: [PATCH] Simplify the isColocateJoin function in PlanFragmentBuilder (#5091) --- .../sql/plan/PlanFragmentBuilder.java | 74 ++++--------------- 1 file changed, 13 insertions(+), 61 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java b/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java index 94900d4b9d43b7..795106772b14b2 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java @@ -1473,7 +1473,7 @@ public PlanFragment visitPhysicalHashJoin(OptExpression optExpr, ExecPlan contex distributionMode = HashJoinNode.DistributionMode.BROADCAST; } else if (!(leftFragmentPlanRoot instanceof ExchangeNode) && !(rightFragmentPlanRoot instanceof ExchangeNode)) { - if (isColocateJoin(optExpr, context, leftFragmentPlanRoot, rightFragmentPlanRoot)) { + if (isColocateJoin(optExpr)) { distributionMode = HashJoinNode.DistributionMode.COLOCATE; } else if (ConnectContext.get().getSessionVariable().isEnableReplicationJoin() && rightFragmentPlanRoot.canDoReplicatedJoin()) { @@ -1661,66 +1661,18 @@ public PlanFragment visitPhysicalHashJoin(OptExpression optExpr, ExecPlan contex } } - private void collectOlapScanInFragment(OptExpression optExpression, - List scanNodeList) { - Operator operator = optExpression.getOp(); - if (operator instanceof PhysicalOlapScanOperator) { - scanNodeList.add((PhysicalOlapScanOperator) operator); - return; - } - if (operator instanceof PhysicalDistributionOperator) { - return; - } - for (OptExpression child : optExpression.getInputs()) { - collectOlapScanInFragment(child, scanNodeList); - } - } - - private boolean isColocateJoin(OptExpression optExpression, ExecPlan context, PlanNode left, PlanNode right) { - List rightScanNodes = Lists.newArrayList(); - collectOlapScanInFragment(optExpression.inputAt(1), rightScanNodes); - - PhysicalHashJoinOperator joinNode = (PhysicalHashJoinOperator) optExpression.getOp(); - List leftScanNodes = Lists.newArrayList(); - collectOlapScanInFragment(optExpression.inputAt(0), leftScanNodes); - - ColumnRefSet leftChildColumns = optExpression.getInputs().get(0).getOutputColumns(); - ColumnRefSet rightChildColumns = optExpression.getInputs().get(1).getOutputColumns(); - List equalOnPredicate = - JoinPredicateUtils.getEqConj(leftChildColumns, rightChildColumns, - Utils.extractConjuncts(joinNode.getOnPredicate())); - - List leftOnPredicateColumns = new ArrayList<>(); - List rightOnPredicateColumns = new ArrayList<>(); - JoinPredicateUtils.getJoinOnPredicatesColumns(equalOnPredicate, leftChildColumns, rightChildColumns, - leftOnPredicateColumns, rightOnPredicateColumns); - - boolean leftChildSatisfied = leftScanNodes.stream().anyMatch(olapScanNode -> leftOnPredicateColumns - .containsAll(olapScanNode.getDistributionSpec().getHashDistributionDesc().getColumns())); - - boolean rightChildSatisfied = rightScanNodes.stream().anyMatch(olapScanNode -> rightOnPredicateColumns - .containsAll(olapScanNode.getDistributionSpec().getHashDistributionDesc().getColumns())); - if (!leftChildSatisfied || !rightChildSatisfied) { - return false; - } - - ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex(); - for (PhysicalOlapScanOperator node : leftScanNodes) { - List outputColumns = - node.getOutputColumns().stream().map(ColumnRefOperator::getId).collect(Collectors.toList()); - if (outputColumns.containsAll(leftOnPredicateColumns)) { - boolean isColocateGroup = colocateIndex - .isSameGroup(node.getTable().getId(), rightScanNodes.get(0).getTable().getId()); - if (node.getTable().getId() == rightScanNodes.get(0).getTable().getId() && - !isColocateGroup) { - return true; - } else { - return isColocateGroup && - !colocateIndex.isGroupUnstable(colocateIndex.getGroup(node.getTable().getId())); - } - } - } - return false; + private boolean isColocateJoin(OptExpression optExpression) { + // through the required properties type check if it is colocate join + return optExpression.getRequiredProperties().stream().allMatch( + physicalPropertySet -> { + if (!physicalPropertySet.getDistributionProperty().isShuffle()) { + return false; + } + HashDistributionDesc.SourceType hashSourceType = + ((HashDistributionSpec) (physicalPropertySet.getDistributionProperty().getSpec())) + .getHashDistributionDesc().getSourceType(); + return hashSourceType.equals(HashDistributionDesc.SourceType.LOCAL); + }); } public boolean isShuffleJoin(OptExpression optExpression) {