Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ public PlanOptimizers(
// MergeJoinForSortedInputOptimizer can avoid the local exchange for a join operation
// Should be placed after AddExchanges, but before AddLocalExchange
// To replace the JoinNode to MergeJoin ahead of AddLocalExchange to avoid adding extra local exchange
builder.add(new MergeJoinForSortedInputOptimizer(metadata, sqlParser));
builder.add(new MergeJoinForSortedInputOptimizer(metadata, sqlParser, featuresConfig.isNativeExecutionEnabled()));

// Optimizers above this don't understand local exchanges, so be careful moving this.
builder.add(new AddLocalExchanges(metadata, sqlParser, featuresConfig.isNativeExecutionEnabled()));
Expand Down Expand Up @@ -934,7 +934,7 @@ public PlanOptimizers(
statsCalculator,
costCalculator,
ImmutableList.of(),
ImmutableSet.of(new RuntimeReorderJoinSides(metadata, sqlParser))));
ImmutableSet.of(new RuntimeReorderJoinSides(metadata, sqlParser, featuresConfig.isNativeExecutionEnabled()))));
this.runtimeOptimizers = runtimeBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public static Optional<JoinNode> createRuntimeSwappedJoinNode(
Lookup lookup,
Session session,
VariableAllocator variableAllocator,
PlanNodeIdAllocator idAllocator)
PlanNodeIdAllocator idAllocator,
boolean nativeExecution)
{
JoinNode swapped = joinNode.flipChildren();

Expand All @@ -81,7 +82,7 @@ public static Optional<JoinNode> createRuntimeSwappedJoinNode(
PlanNode resolvedSwappedLeft = lookup.resolve(newLeft);
if (resolvedSwappedLeft instanceof ExchangeNode && resolvedSwappedLeft.getSources().size() == 1) {
// Ensure the new probe after skipping the local exchange will satisfy the required probe side property
if (checkProbeSidePropertySatisfied(resolvedSwappedLeft.getSources().get(0), metadata, parser, lookup, session, variableAllocator)) {
if (checkProbeSidePropertySatisfied(resolvedSwappedLeft.getSources().get(0), metadata, parser, lookup, session, variableAllocator, nativeExecution)) {
newLeft = resolvedSwappedLeft.getSources().get(0);
// The HashGenerationOptimizer will generate hashVariables and append to the output layout of the nodes following the same order. Therefore,
// we use the index of the old hashVariable in the ExchangeNode output layout to retrieve the hashVariable from the new left node, and feed
Expand All @@ -105,7 +106,7 @@ public static Optional<JoinNode> createRuntimeSwappedJoinNode(
.map(EquiJoinClause::getRight)
.collect(toImmutableList());
PlanNode newRight = swapped.getRight();
if (!checkBuildSidePropertySatisfied(swapped.getRight(), buildJoinVariables, metadata, parser, lookup, session, variableAllocator)) {
if (!checkBuildSidePropertySatisfied(swapped.getRight(), buildJoinVariables, metadata, parser, lookup, session, variableAllocator, nativeExecution)) {
if (getTaskConcurrency(session) > 1) {
newRight = systemPartitionedExchange(
idAllocator.getNextId(),
Expand Down Expand Up @@ -137,7 +138,7 @@ public static Optional<JoinNode> createRuntimeSwappedJoinNode(
}

// Check if the new probe side after removing unnecessary local exchange is valid.
public static boolean checkProbeSidePropertySatisfied(PlanNode node, Metadata metadata, SqlParser parser, Lookup lookup, Session session, VariableAllocator variableAllocator)
public static boolean checkProbeSidePropertySatisfied(PlanNode node, Metadata metadata, SqlParser parser, Lookup lookup, Session session, VariableAllocator variableAllocator, boolean nativeExecution)
{
StreamPreferredProperties requiredProbeProperty;
if (isSpillEnabled(session) && isJoinSpillingEnabled(session)) {
Expand All @@ -146,7 +147,7 @@ public static boolean checkProbeSidePropertySatisfied(PlanNode node, Metadata me
else {
requiredProbeProperty = defaultParallelism(session);
}
StreamPropertyDerivations.StreamProperties nodeProperty = derivePropertiesRecursively(node, metadata, parser, lookup, session, variableAllocator);
StreamPropertyDerivations.StreamProperties nodeProperty = derivePropertiesRecursively(node, metadata, parser, lookup, session, variableAllocator, nativeExecution);
return requiredProbeProperty.isSatisfiedBy(nodeProperty);
}

Expand All @@ -158,7 +159,8 @@ private static boolean checkBuildSidePropertySatisfied(
SqlParser parser,
Lookup lookup,
Session session,
VariableAllocator variableAllocator)
VariableAllocator variableAllocator,
boolean nativeExecution)
{
StreamPreferredProperties requiredBuildProperty;
if (getTaskConcurrency(session) > 1) {
Expand All @@ -167,7 +169,7 @@ private static boolean checkBuildSidePropertySatisfied(
else {
requiredBuildProperty = singleStream();
}
StreamPropertyDerivations.StreamProperties nodeProperty = derivePropertiesRecursively(node, metadata, parser, lookup, session, variableAllocator);
StreamPropertyDerivations.StreamProperties nodeProperty = derivePropertiesRecursively(node, metadata, parser, lookup, session, variableAllocator, nativeExecution);
return requiredBuildProperty.isSatisfiedBy(nodeProperty);
}

Expand All @@ -177,13 +179,14 @@ private static StreamPropertyDerivations.StreamProperties derivePropertiesRecurs
SqlParser parser,
Lookup lookup,
Session session,
VariableAllocator variableAllocator)
VariableAllocator variableAllocator,
boolean nativeExecution)
{
PlanNode actual = lookup.resolve(node);
List<StreamPropertyDerivations.StreamProperties> inputProperties = actual.getSources().stream()
.map(source -> derivePropertiesRecursively(source, metadata, parser, lookup, session, variableAllocator))
.map(source -> derivePropertiesRecursively(source, metadata, parser, lookup, session, variableAllocator, nativeExecution))
.collect(toImmutableList());
return StreamPropertyDerivations.deriveProperties(actual, inputProperties, metadata, session, TypeProvider.viewOf(variableAllocator.getVariables()), parser);
return StreamPropertyDerivations.deriveProperties(actual, inputProperties, metadata, session, TypeProvider.viewOf(variableAllocator.getVariables()), parser, nativeExecution);
}

public static boolean isBelowBroadcastLimit(PlanNode planNode, Rule.Context context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,20 @@ public class RuntimeReorderJoinSides

private final Metadata metadata;
private final SqlParser parser;
private final boolean nativeExecution;

public RuntimeReorderJoinSides(Metadata metadata, SqlParser parser)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.parser = requireNonNull(parser, "parser is null");
this.nativeExecution = false;
}

public RuntimeReorderJoinSides(Metadata metadata, SqlParser parser, boolean nativeExecution)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.parser = requireNonNull(parser, "parser is null");
this.nativeExecution = nativeExecution;
}

@Override
Expand Down Expand Up @@ -100,7 +109,7 @@ public Result apply(JoinNode joinNode, Captures captures, Context context)
return Result.empty();
}

Optional<JoinNode> rewrittenNode = createRuntimeSwappedJoinNode(joinNode, metadata, parser, context.getLookup(), context.getSession(), context.getVariableAllocator(), context.getIdAllocator());
Optional<JoinNode> rewrittenNode = createRuntimeSwappedJoinNode(joinNode, metadata, parser, context.getLookup(), context.getSession(), context.getVariableAllocator(), context.getIdAllocator(), nativeExecution);
if (rewrittenNode.isPresent()) {
log.debug(format("Probe size: %.2f is smaller than Build size: %.2f => invoke runtime join swapping on JoinNode ID: %s.", leftOutputSizeInBytes, rightOutputSizeInBytes, joinNode.getId()));
return Result.ofPlanNode(rewrittenNode.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ public PlanWithProperties visitIndexJoin(IndexJoinNode node, StreamPreferredProp
parentPreferences.constrainTo(node.getProbeSource().getOutputVariables()).withDefaultParallelism(session));

// index source does not support local parallel and must produce a single stream
StreamProperties indexStreamProperties = derivePropertiesRecursively(node.getIndexSource(), metadata, session, types, parser);
StreamProperties indexStreamProperties = derivePropertiesRecursively(node.getIndexSource(), metadata, session, types, parser, nativeExecution);
checkArgument(indexStreamProperties.getDistribution() == SINGLE, "index source must be single stream");
PlanWithProperties index = new PlanWithProperties(node.getIndexSource(), indexStreamProperties);

Expand Down Expand Up @@ -933,12 +933,12 @@ private PlanWithProperties rebaseAndDeriveProperties(PlanNode node, List<PlanWit

private PlanWithProperties deriveProperties(PlanNode result, StreamProperties inputProperties)
{
return new PlanWithProperties(result, StreamPropertyDerivations.deriveProperties(result, inputProperties, metadata, session, types, parser));
return new PlanWithProperties(result, StreamPropertyDerivations.deriveProperties(result, inputProperties, metadata, session, types, parser, nativeExecution));
}

private PlanWithProperties deriveProperties(PlanNode result, List<StreamProperties> inputProperties)
{
return new PlanWithProperties(result, StreamPropertyDerivations.deriveProperties(result, inputProperties, metadata, session, types, parser));
return new PlanWithProperties(result, StreamPropertyDerivations.deriveProperties(result, inputProperties, metadata, session, types, parser, nativeExecution));
}

private PlanWithProperties accept(PlanNode node, StreamPreferredProperties context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ public class MergeJoinForSortedInputOptimizer
private final Metadata metadata;
private final SqlParser parser;
private boolean isEnabledForTesting;
private final boolean nativeExecution;

public MergeJoinForSortedInputOptimizer(Metadata metadata, SqlParser parser)
public MergeJoinForSortedInputOptimizer(Metadata metadata, SqlParser parser, boolean nativeExecution)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.parser = requireNonNull(parser, "parser is null");
this.nativeExecution = nativeExecution;
}

@Override
Expand Down Expand Up @@ -141,8 +143,8 @@ public PlanNode visitJoin(JoinNode node, RewriteContext<Void> context)
private boolean meetsDataRequirement(PlanNode left, PlanNode right, JoinNode node)
{
// Acquire data properties for both left and right side
StreamPropertyDerivations.StreamProperties leftProperties = StreamPropertyDerivations.derivePropertiesRecursively(left, metadata, session, types, parser);
StreamPropertyDerivations.StreamProperties rightProperties = StreamPropertyDerivations.derivePropertiesRecursively(right, metadata, session, types, parser);
StreamPropertyDerivations.StreamProperties leftProperties = StreamPropertyDerivations.derivePropertiesRecursively(left, metadata, session, types, parser, nativeExecution);
StreamPropertyDerivations.StreamProperties rightProperties = StreamPropertyDerivations.derivePropertiesRecursively(right, metadata, session, types, parser, nativeExecution);

List<VariableReferenceExpression> leftJoinColumns = node.getCriteria().stream().map(EquiJoinClause::getLeft).collect(toImmutableList());
List<VariableReferenceExpression> rightJoinColumns = node.getCriteria().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,20 @@ public final class StreamPropertyDerivations
{
private StreamPropertyDerivations() {}

public static StreamProperties derivePropertiesRecursively(PlanNode node, Metadata metadata, Session session, TypeProvider types, SqlParser parser)
public static StreamProperties derivePropertiesRecursively(PlanNode node, Metadata metadata, Session session, TypeProvider types, SqlParser parser, boolean nativeExecution)
{
List<StreamProperties> inputProperties = node.getSources().stream()
.map(source -> derivePropertiesRecursively(source, metadata, session, types, parser))
.map(source -> derivePropertiesRecursively(source, metadata, session, types, parser, nativeExecution))
.collect(toImmutableList());
return StreamPropertyDerivations.deriveProperties(node, inputProperties, metadata, session, types, parser);
return StreamPropertyDerivations.deriveProperties(node, inputProperties, metadata, session, types, parser, nativeExecution);
}

public static StreamProperties deriveProperties(PlanNode node, StreamProperties inputProperties, Metadata metadata, Session session, TypeProvider types, SqlParser parser)
public static StreamProperties deriveProperties(PlanNode node, StreamProperties inputProperties, Metadata metadata, Session session, TypeProvider types, SqlParser parser, boolean nativeExecution)
{
return deriveProperties(node, ImmutableList.of(inputProperties), metadata, session, types, parser);
return deriveProperties(node, ImmutableList.of(inputProperties), metadata, session, types, parser, nativeExecution);
}

public static StreamProperties deriveProperties(PlanNode node, List<StreamProperties> inputProperties, Metadata metadata, Session session, TypeProvider types, SqlParser parser)
public static StreamProperties deriveProperties(PlanNode node, List<StreamProperties> inputProperties, Metadata metadata, Session session, TypeProvider types, SqlParser parser, boolean nativeExecution)
{
requireNonNull(node, "node is null");
requireNonNull(inputProperties, "inputProperties is null");
Expand All @@ -133,7 +133,7 @@ public static StreamProperties deriveProperties(PlanNode node, List<StreamProper
types,
parser);

StreamProperties result = node.accept(new Visitor(metadata, session, types), inputProperties)
StreamProperties result = node.accept(new Visitor(metadata, session, types, nativeExecution), inputProperties)
.withOtherActualProperties(otherProperties);

result.getPartitioningColumns().ifPresent(columns ->
Expand All @@ -154,12 +154,14 @@ private static class Visitor
private final Metadata metadata;
private final Session session;
private final TypeProvider types;
private final boolean nativeExecution;

private Visitor(Metadata metadata, Session session, TypeProvider types)
private Visitor(Metadata metadata, Session session, TypeProvider types, boolean nativeExecution)
{
this.metadata = metadata;
this.session = session;
this.types = types;
this.nativeExecution = nativeExecution;
}

@Override
Expand All @@ -185,9 +187,16 @@ public StreamProperties visitJoin(JoinNode node, List<StreamProperties> inputPro
.translate(column -> PropertyDerivations.filterOrRewrite(outputs, node.getCriteria(), column))
.unordered(unordered);
case LEFT:
return leftProperties
.translate(column -> PropertyDerivations.filterIfMissing(outputs, column))
.unordered(unordered);
if (nativeExecution && node.getCriteria().isEmpty()) {
// This maps to a NestedLoopJoin in Native engine. The NestedLoopJoin output is not

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rschlussel @feilong-liu Rebecca, Feilong, would you help take a first pass for this change? At a high level this seems reasonable, but I haven't been working in the optimizer code for quite some time.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does only left join with no criteria produce nested loop join? What happens for inner join? Also, would it be enough to just change unordered to false for these cases, but otherwise keep the other things the same (does the stream distribution or partitioning change)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rschlussel : That's a good point. I was trying to contain this fix, but this change would be applicable for inner too. This change has come by in correlated queries that seem to use LEFT join predominantly.

But just changing unordered to false didn't work. I had to change the partitioning as well for the change to take effect. The addLocalExchanges seems to leverage distribution as well. More at #22585 (comment)

// partitioned or ordered.
return new StreamProperties(MULTIPLE, Optional.empty(), false);
}
else {
return leftProperties
.translate(column -> PropertyDerivations.filterIfMissing(outputs, column))
.unordered(unordered);
}
case RIGHT:
// since this is a right join, none of the matched output rows will contain nulls
// in the left partitioning columns, and all of the unmatched rows will have
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode)
new TypeValidator(),
new VerifyNoFilteredAggregations(),
new VerifyNoIntermediateFormExpression(),
new ValidateStreamingJoins())
new ValidateStreamingJoins(featuresConfig.isNativeExecutionEnabled()))
.putAll(
Stage.FINAL,
new CheckUnsupportedExternalFunctions(),
Expand All @@ -64,8 +64,8 @@ public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode)
new TypeValidator(),
new VerifyOnlyOneOutputNode(),
new VerifyNoFilteredAggregations(),
new ValidateAggregationsWithDefaultValues(forceSingleNode),
new ValidateStreamingAggregations(),
new ValidateAggregationsWithDefaultValues(forceSingleNode, featuresConfig.isNativeExecutionEnabled()),
new ValidateStreamingAggregations(featuresConfig.isNativeExecutionEnabled()),
new VerifyNoIntermediateFormExpression(),
new VerifyProjectionLocality(),
new DynamicFiltersChecker(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,18 @@ public class ValidateAggregationsWithDefaultValues
{
private final boolean forceSingleNode;

private final boolean nativeExecution;

public ValidateAggregationsWithDefaultValues(boolean forceSingleNode)
{
this.forceSingleNode = forceSingleNode;
this.nativeExecution = false;
}

public ValidateAggregationsWithDefaultValues(boolean forceSingleNode, boolean nativeExecution)
{
this.forceSingleNode = forceSingleNode;
this.nativeExecution = nativeExecution;
}

@Override
Expand Down Expand Up @@ -126,7 +135,7 @@ public Optional<SeenExchanges> visitAggregation(AggregationNode node, Void conte
if (!seenExchanges.localRepartitionExchange) {
// No local repartition exchange between final and partial aggregation.
// Make sure that final aggregation operators are executed by single thread.
StreamProperties localProperties = StreamPropertyDerivations.derivePropertiesRecursively(node, metadata, session, types, parser);
StreamProperties localProperties = StreamPropertyDerivations.derivePropertiesRecursively(node, metadata, session, types, parser, nativeExecution);
checkArgument(localProperties.isSingleStream(),
"Final aggregation with default value not separated from partial aggregation by local hash exchange");
}
Expand Down
Loading