diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index d3f67d4d2fdfe..8d503ddb13fcc 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -67,7 +67,7 @@ public class FeaturesConfig private boolean distributedIndexJoinsEnabled; private JoinDistributionType joinDistributionType = PARTITIONED; private DataSize joinMaxBroadcastTableSize; - private boolean colocatedJoinsEnabled; + private boolean colocatedJoinsEnabled = true; private boolean groupedExecutionForAggregationEnabled; private boolean groupedExecutionForEligibleTableScansEnabled; private boolean dynamicScheduleForGroupedExecution; diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/Partitioning.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/Partitioning.java index 8bf1c78a41a4f..4469400fb7190 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/Partitioning.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/Partitioning.java @@ -39,6 +39,7 @@ import java.util.function.Function; import static com.facebook.presto.spi.relation.SpecialFormExpression.Form.COALESCE; +import static com.facebook.presto.sql.planner.optimizations.PropertyDerivations.arePartitionHandlesCompatibleForCoalesce; import static com.facebook.presto.sql.relational.OriginalExpressionUtils.castToExpression; import static com.facebook.presto.sql.relational.OriginalExpressionUtils.isExpression; import static com.google.common.base.MoreObjects.toStringHelper; @@ -317,9 +318,9 @@ else if (argument instanceof VariableReferenceExpression) { } // Maps VariableReferenceExpression in both partitions to an COALESCE expression, keeps constant arguments unchanged. - public Optional translateToCoalesce(Partitioning other) + public Optional translateToCoalesce(Partitioning other, Metadata metadata, Session session) { - checkArgument(this.handle.equals(other.handle), "incompatible partitioning handles: %s != %s", this.handle, other.handle); + checkArgument(arePartitionHandlesCompatibleForCoalesce(this.handle, other.handle, metadata, session), "incompatible partitioning handles: cannot coalesce %s and %s", this.handle, other.handle); checkArgument(this.arguments.size() == other.arguments.size(), "incompatible number of partitioning arguments: %s != %s", this.arguments.size(), other.arguments.size()); ImmutableList.Builder arguments = ImmutableList.builder(); for (int i = 0; i < this.arguments.size(); i++) { @@ -341,7 +342,7 @@ else if (leftArgument instanceof VariableReferenceExpression && rightArgument in return Optional.empty(); } } - return Optional.of(new Partitioning(this.handle, arguments.build())); + return Optional.of(new Partitioning(metadata.isRefinedPartitioningOver(session, other.handle, this.handle) ? this.handle : other.handle, arguments.build())); } public Optional translateRowExpression(Map inputToOutputMappings, Map assignments, TypeProvider types) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/ActualProperties.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/ActualProperties.java index e99882c87b11f..48ae4e9aca6c1 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/ActualProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/ActualProperties.java @@ -443,9 +443,9 @@ public static Global streamPartitionedOn(List strea false); } - public static Global partitionedOnCoalesce(Partitioning one, Partitioning other) + public static Global partitionedOnCoalesce(Partitioning one, Partitioning other, Metadata metadata, Session session) { - return new Global(one.translateToCoalesce(other), Optional.empty(), false); + return new Global(one.translateToCoalesce(other, metadata, session), Optional.empty(), false); } public Global withReplicatedNulls(boolean replicatedNulls) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java index 014067a379071..48db7ed3bd5cc 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java @@ -42,6 +42,7 @@ import com.facebook.presto.sql.planner.ExpressionDomainTranslator; import com.facebook.presto.sql.planner.ExpressionInterpreter; import com.facebook.presto.sql.planner.NoOpVariableResolver; +import com.facebook.presto.sql.planner.PartitioningHandle; import com.facebook.presto.sql.planner.RowExpressionInterpreter; import com.facebook.presto.sql.planner.TypeProvider; import com.facebook.presto.sql.planner.optimizations.ActualProperties.Global; @@ -449,9 +450,16 @@ public ActualProperties visitJoin(JoinNode node, List inputPro .build(); } - if (isOptimizeFullOuterJoinWithCoalesce(session) && probeProperties.getNodePartitioning().isPresent() && buildProperties.getNodePartitioning().isPresent()) { + if (isOptimizeFullOuterJoinWithCoalesce(session) && + probeProperties.getNodePartitioning().isPresent() && + buildProperties.getNodePartitioning().isPresent() && + arePartitionHandlesCompatibleForCoalesce( + probeProperties.getNodePartitioning().get().getHandle(), + buildProperties.getNodePartitioning().get().getHandle(), + metadata, + session)) { return ActualProperties.builder() - .global(partitionedOnCoalesce(probeProperties.getNodePartitioning().get(), buildProperties.getNodePartitioning().get())) + .global(partitionedOnCoalesce(probeProperties.getNodePartitioning().get(), buildProperties.getNodePartitioning().get(), metadata, session)) .build(); } @@ -867,4 +875,9 @@ else if (equality.getRight().equals(column) && columns.contains(equality.getLeft return Optional.empty(); } + + public static boolean arePartitionHandlesCompatibleForCoalesce(PartitioningHandle a, PartitioningHandle b, Metadata metadata, Session session) + { + return a.equals(b) || metadata.isRefinedPartitioningOver(session, a, b) || metadata.isRefinedPartitioningOver(session, b, a); + } } diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index 3171adb57d3c4..5a5e2fa7967bb 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -65,7 +65,7 @@ public void testDefaults() .setMaxStageRetries(0) .setConcurrentLifespansPerTask(0) .setFastInequalityJoins(true) - .setColocatedJoinsEnabled(false) + .setColocatedJoinsEnabled(true) .setSpatialJoinsEnabled(true) .setJoinReorderingStrategy(ELIMINATE_CROSS_JOINS) .setPartialMergePushdownStrategy(FeaturesConfig.PartialMergePushdownStrategy.NONE) @@ -164,7 +164,7 @@ public void testExplicitPropertyMappings() .put("max-stage-retries", "10") .put("concurrent-lifespans-per-task", "1") .put("fast-inequality-joins", "false") - .put("colocated-joins-enabled", "true") + .put("colocated-joins-enabled", "false") .put("spatial-joins-enabled", "false") .put("optimizer.join-reordering-strategy", "NONE") .put("experimental.optimizer.partial-merge-pushdown-strategy", PUSH_THROUGH_LOW_MEMORY_OPERATORS.name()) @@ -241,7 +241,7 @@ public void testExplicitPropertyMappings() .setMaxStageRetries(10) .setConcurrentLifespansPerTask(1) .setFastInequalityJoins(false) - .setColocatedJoinsEnabled(true) + .setColocatedJoinsEnabled(false) .setSpatialJoinsEnabled(false) .setJoinReorderingStrategy(NONE) .setPartialMergePushdownStrategy(PUSH_THROUGH_LOW_MEMORY_OPERATORS)