Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class FeaturesConfig
private boolean distributedIndexJoinsEnabled;
private JoinDistributionType joinDistributionType = PARTITIONED;
private DataSize joinMaxBroadcastTableSize;
private boolean colocatedJoinsEnabled;
private boolean colocatedJoinsEnabled = true;
private boolean groupedExecutionForAggregationEnabled;
private boolean groupedExecutionForEligibleTableScansEnabled;
private boolean dynamicScheduleForGroupedExecution;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.function.Function;

import static com.facebook.presto.spi.relation.SpecialFormExpression.Form.COALESCE;
import static com.facebook.presto.sql.planner.optimizations.PropertyDerivations.arePartitionHandlesCompatibleForCoalesce;
import static com.facebook.presto.sql.relational.OriginalExpressionUtils.castToExpression;
import static com.facebook.presto.sql.relational.OriginalExpressionUtils.isExpression;
import static com.google.common.base.MoreObjects.toStringHelper;
Expand Down Expand Up @@ -317,9 +318,9 @@ else if (argument instanceof VariableReferenceExpression) {
}

// Maps VariableReferenceExpression in both partitions to an COALESCE expression, keeps constant arguments unchanged.
public Optional<Partitioning> translateToCoalesce(Partitioning other)
public Optional<Partitioning> translateToCoalesce(Partitioning other, Metadata metadata, Session session)
{
checkArgument(this.handle.equals(other.handle), "incompatible partitioning handles: %s != %s", this.handle, other.handle);
checkArgument(arePartitionHandlesCompatibleForCoalesce(this.handle, other.handle, metadata, session), "incompatible partitioning handles: cannot coalesce %s and %s", this.handle, other.handle);
checkArgument(this.arguments.size() == other.arguments.size(), "incompatible number of partitioning arguments: %s != %s", this.arguments.size(), other.arguments.size());
ImmutableList.Builder<RowExpression> arguments = ImmutableList.builder();
for (int i = 0; i < this.arguments.size(); i++) {
Expand All @@ -341,7 +342,7 @@ else if (leftArgument instanceof VariableReferenceExpression && rightArgument in
return Optional.empty();
}
}
return Optional.of(new Partitioning(this.handle, arguments.build()));
return Optional.of(new Partitioning(metadata.isRefinedPartitioningOver(session, other.handle, this.handle) ? this.handle : other.handle, arguments.build()));
}

public Optional<Partitioning> translateRowExpression(Map<VariableReferenceExpression, RowExpression> inputToOutputMappings, Map<VariableReferenceExpression, RowExpression> assignments, TypeProvider types)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,9 @@ public static Global streamPartitionedOn(List<VariableReferenceExpression> strea
false);
}

public static Global partitionedOnCoalesce(Partitioning one, Partitioning other)
public static Global partitionedOnCoalesce(Partitioning one, Partitioning other, Metadata metadata, Session session)
{
return new Global(one.translateToCoalesce(other), Optional.empty(), false);
return new Global(one.translateToCoalesce(other, metadata, session), Optional.empty(), false);
}

public Global withReplicatedNulls(boolean replicatedNulls)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.facebook.presto.sql.planner.ExpressionDomainTranslator;
import com.facebook.presto.sql.planner.ExpressionInterpreter;
import com.facebook.presto.sql.planner.NoOpVariableResolver;
import com.facebook.presto.sql.planner.PartitioningHandle;
import com.facebook.presto.sql.planner.RowExpressionInterpreter;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.optimizations.ActualProperties.Global;
Expand Down Expand Up @@ -449,9 +450,16 @@ public ActualProperties visitJoin(JoinNode node, List<ActualProperties> inputPro
.build();
}

if (isOptimizeFullOuterJoinWithCoalesce(session) && probeProperties.getNodePartitioning().isPresent() && buildProperties.getNodePartitioning().isPresent()) {
if (isOptimizeFullOuterJoinWithCoalesce(session) &&
probeProperties.getNodePartitioning().isPresent() &&
buildProperties.getNodePartitioning().isPresent() &&
arePartitionHandlesCompatibleForCoalesce(
probeProperties.getNodePartitioning().get().getHandle(),
buildProperties.getNodePartitioning().get().getHandle(),
metadata,
session)) {
return ActualProperties.builder()
.global(partitionedOnCoalesce(probeProperties.getNodePartitioning().get(), buildProperties.getNodePartitioning().get()))
.global(partitionedOnCoalesce(probeProperties.getNodePartitioning().get(), buildProperties.getNodePartitioning().get(), metadata, session))
.build();
}

Expand Down Expand Up @@ -867,4 +875,9 @@ else if (equality.getRight().equals(column) && columns.contains(equality.getLeft

return Optional.empty();
}

public static boolean arePartitionHandlesCompatibleForCoalesce(PartitioningHandle a, PartitioningHandle b, Metadata metadata, Session session)
{
return a.equals(b) || metadata.isRefinedPartitioningOver(session, a, b) || metadata.isRefinedPartitioningOver(session, b, a);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testDefaults()
.setMaxStageRetries(0)
.setConcurrentLifespansPerTask(0)
.setFastInequalityJoins(true)
.setColocatedJoinsEnabled(false)
.setColocatedJoinsEnabled(true)
.setSpatialJoinsEnabled(true)
.setJoinReorderingStrategy(ELIMINATE_CROSS_JOINS)
.setPartialMergePushdownStrategy(FeaturesConfig.PartialMergePushdownStrategy.NONE)
Expand Down Expand Up @@ -164,7 +164,7 @@ public void testExplicitPropertyMappings()
.put("max-stage-retries", "10")
.put("concurrent-lifespans-per-task", "1")
.put("fast-inequality-joins", "false")
.put("colocated-joins-enabled", "true")
.put("colocated-joins-enabled", "false")
.put("spatial-joins-enabled", "false")
.put("optimizer.join-reordering-strategy", "NONE")
.put("experimental.optimizer.partial-merge-pushdown-strategy", PUSH_THROUGH_LOW_MEMORY_OPERATORS.name())
Expand Down Expand Up @@ -241,7 +241,7 @@ public void testExplicitPropertyMappings()
.setMaxStageRetries(10)
.setConcurrentLifespansPerTask(1)
.setFastInequalityJoins(false)
.setColocatedJoinsEnabled(true)
.setColocatedJoinsEnabled(false)
.setSpatialJoinsEnabled(false)
.setJoinReorderingStrategy(NONE)
.setPartialMergePushdownStrategy(PUSH_THROUGH_LOW_MEMORY_OPERATORS)
Expand Down