diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java index 6bff688e2a892..5bb9d3b9b89ba 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java @@ -872,7 +872,7 @@ public PlanOptimizers( // MergeJoinForSortedInputOptimizer can avoid the local exchange for a join operation // Should be placed after AddExchanges, but before AddLocalExchange // To replace the JoinNode to MergeJoin ahead of AddLocalExchange to avoid adding extra local exchange - builder.add(new MergeJoinForSortedInputOptimizer(metadata, sqlParser)); + builder.add(new MergeJoinForSortedInputOptimizer(metadata, sqlParser, featuresConfig.isNativeExecutionEnabled())); // Optimizers above this don't understand local exchanges, so be careful moving this. builder.add(new AddLocalExchanges(metadata, sqlParser, featuresConfig.isNativeExecutionEnabled())); @@ -934,7 +934,7 @@ public PlanOptimizers( statsCalculator, costCalculator, ImmutableList.of(), - ImmutableSet.of(new RuntimeReorderJoinSides(metadata, sqlParser)))); + ImmutableSet.of(new RuntimeReorderJoinSides(metadata, sqlParser, featuresConfig.isNativeExecutionEnabled())))); this.runtimeOptimizers = runtimeBuilder.build(); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/JoinSwappingUtils.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/JoinSwappingUtils.java index 803a08e2308a8..4705add3365bb 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/JoinSwappingUtils.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/JoinSwappingUtils.java @@ -69,7 +69,8 @@ public static Optional createRuntimeSwappedJoinNode( Lookup lookup, Session session, VariableAllocator variableAllocator, - PlanNodeIdAllocator idAllocator) + PlanNodeIdAllocator idAllocator, + boolean nativeExecution) { JoinNode swapped = joinNode.flipChildren(); @@ -81,7 +82,7 @@ public static Optional createRuntimeSwappedJoinNode( PlanNode resolvedSwappedLeft = lookup.resolve(newLeft); if (resolvedSwappedLeft instanceof ExchangeNode && resolvedSwappedLeft.getSources().size() == 1) { // Ensure the new probe after skipping the local exchange will satisfy the required probe side property - if (checkProbeSidePropertySatisfied(resolvedSwappedLeft.getSources().get(0), metadata, parser, lookup, session, variableAllocator)) { + if (checkProbeSidePropertySatisfied(resolvedSwappedLeft.getSources().get(0), metadata, parser, lookup, session, variableAllocator, nativeExecution)) { newLeft = resolvedSwappedLeft.getSources().get(0); // The HashGenerationOptimizer will generate hashVariables and append to the output layout of the nodes following the same order. Therefore, // we use the index of the old hashVariable in the ExchangeNode output layout to retrieve the hashVariable from the new left node, and feed @@ -105,7 +106,7 @@ public static Optional createRuntimeSwappedJoinNode( .map(EquiJoinClause::getRight) .collect(toImmutableList()); PlanNode newRight = swapped.getRight(); - if (!checkBuildSidePropertySatisfied(swapped.getRight(), buildJoinVariables, metadata, parser, lookup, session, variableAllocator)) { + if (!checkBuildSidePropertySatisfied(swapped.getRight(), buildJoinVariables, metadata, parser, lookup, session, variableAllocator, nativeExecution)) { if (getTaskConcurrency(session) > 1) { newRight = systemPartitionedExchange( idAllocator.getNextId(), @@ -137,7 +138,7 @@ public static Optional createRuntimeSwappedJoinNode( } // Check if the new probe side after removing unnecessary local exchange is valid. - public static boolean checkProbeSidePropertySatisfied(PlanNode node, Metadata metadata, SqlParser parser, Lookup lookup, Session session, VariableAllocator variableAllocator) + public static boolean checkProbeSidePropertySatisfied(PlanNode node, Metadata metadata, SqlParser parser, Lookup lookup, Session session, VariableAllocator variableAllocator, boolean nativeExecution) { StreamPreferredProperties requiredProbeProperty; if (isSpillEnabled(session) && isJoinSpillingEnabled(session)) { @@ -146,7 +147,7 @@ public static boolean checkProbeSidePropertySatisfied(PlanNode node, Metadata me else { requiredProbeProperty = defaultParallelism(session); } - StreamPropertyDerivations.StreamProperties nodeProperty = derivePropertiesRecursively(node, metadata, parser, lookup, session, variableAllocator); + StreamPropertyDerivations.StreamProperties nodeProperty = derivePropertiesRecursively(node, metadata, parser, lookup, session, variableAllocator, nativeExecution); return requiredProbeProperty.isSatisfiedBy(nodeProperty); } @@ -158,7 +159,8 @@ private static boolean checkBuildSidePropertySatisfied( SqlParser parser, Lookup lookup, Session session, - VariableAllocator variableAllocator) + VariableAllocator variableAllocator, + boolean nativeExecution) { StreamPreferredProperties requiredBuildProperty; if (getTaskConcurrency(session) > 1) { @@ -167,7 +169,7 @@ private static boolean checkBuildSidePropertySatisfied( else { requiredBuildProperty = singleStream(); } - StreamPropertyDerivations.StreamProperties nodeProperty = derivePropertiesRecursively(node, metadata, parser, lookup, session, variableAllocator); + StreamPropertyDerivations.StreamProperties nodeProperty = derivePropertiesRecursively(node, metadata, parser, lookup, session, variableAllocator, nativeExecution); return requiredBuildProperty.isSatisfiedBy(nodeProperty); } @@ -177,13 +179,14 @@ private static StreamPropertyDerivations.StreamProperties derivePropertiesRecurs SqlParser parser, Lookup lookup, Session session, - VariableAllocator variableAllocator) + VariableAllocator variableAllocator, + boolean nativeExecution) { PlanNode actual = lookup.resolve(node); List inputProperties = actual.getSources().stream() - .map(source -> derivePropertiesRecursively(source, metadata, parser, lookup, session, variableAllocator)) + .map(source -> derivePropertiesRecursively(source, metadata, parser, lookup, session, variableAllocator, nativeExecution)) .collect(toImmutableList()); - return StreamPropertyDerivations.deriveProperties(actual, inputProperties, metadata, session, TypeProvider.viewOf(variableAllocator.getVariables()), parser); + return StreamPropertyDerivations.deriveProperties(actual, inputProperties, metadata, session, TypeProvider.viewOf(variableAllocator.getVariables()), parser, nativeExecution); } public static boolean isBelowBroadcastLimit(PlanNode planNode, Rule.Context context) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/RuntimeReorderJoinSides.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/RuntimeReorderJoinSides.java index 34ff2a8216adc..b4dc8e1bbcc35 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/RuntimeReorderJoinSides.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/RuntimeReorderJoinSides.java @@ -46,11 +46,20 @@ public class RuntimeReorderJoinSides private final Metadata metadata; private final SqlParser parser; + private final boolean nativeExecution; public RuntimeReorderJoinSides(Metadata metadata, SqlParser parser) { this.metadata = requireNonNull(metadata, "metadata is null"); this.parser = requireNonNull(parser, "parser is null"); + this.nativeExecution = false; + } + + public RuntimeReorderJoinSides(Metadata metadata, SqlParser parser, boolean nativeExecution) + { + this.metadata = requireNonNull(metadata, "metadata is null"); + this.parser = requireNonNull(parser, "parser is null"); + this.nativeExecution = nativeExecution; } @Override @@ -100,7 +109,7 @@ public Result apply(JoinNode joinNode, Captures captures, Context context) return Result.empty(); } - Optional rewrittenNode = createRuntimeSwappedJoinNode(joinNode, metadata, parser, context.getLookup(), context.getSession(), context.getVariableAllocator(), context.getIdAllocator()); + Optional rewrittenNode = createRuntimeSwappedJoinNode(joinNode, metadata, parser, context.getLookup(), context.getSession(), context.getVariableAllocator(), context.getIdAllocator(), nativeExecution); if (rewrittenNode.isPresent()) { log.debug(format("Probe size: %.2f is smaller than Build size: %.2f => invoke runtime join swapping on JoinNode ID: %s.", leftOutputSizeInBytes, rightOutputSizeInBytes, joinNode.getId())); return Result.ofPlanNode(rewrittenNode.get()); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java index a2765f36aa516..1f033e294b13d 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java @@ -837,7 +837,7 @@ public PlanWithProperties visitIndexJoin(IndexJoinNode node, StreamPreferredProp parentPreferences.constrainTo(node.getProbeSource().getOutputVariables()).withDefaultParallelism(session)); // index source does not support local parallel and must produce a single stream - StreamProperties indexStreamProperties = derivePropertiesRecursively(node.getIndexSource(), metadata, session, types, parser); + StreamProperties indexStreamProperties = derivePropertiesRecursively(node.getIndexSource(), metadata, session, types, parser, nativeExecution); checkArgument(indexStreamProperties.getDistribution() == SINGLE, "index source must be single stream"); PlanWithProperties index = new PlanWithProperties(node.getIndexSource(), indexStreamProperties); @@ -933,12 +933,12 @@ private PlanWithProperties rebaseAndDeriveProperties(PlanNode node, List inputProperties) { - return new PlanWithProperties(result, StreamPropertyDerivations.deriveProperties(result, inputProperties, metadata, session, types, parser)); + return new PlanWithProperties(result, StreamPropertyDerivations.deriveProperties(result, inputProperties, metadata, session, types, parser, nativeExecution)); } private PlanWithProperties accept(PlanNode node, StreamPreferredProperties context) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java index ba8edbd8a610b..3b481b3ec2cc4 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java @@ -42,11 +42,13 @@ public class MergeJoinForSortedInputOptimizer private final Metadata metadata; private final SqlParser parser; private boolean isEnabledForTesting; + private final boolean nativeExecution; - public MergeJoinForSortedInputOptimizer(Metadata metadata, SqlParser parser) + public MergeJoinForSortedInputOptimizer(Metadata metadata, SqlParser parser, boolean nativeExecution) { this.metadata = requireNonNull(metadata, "metadata is null"); this.parser = requireNonNull(parser, "parser is null"); + this.nativeExecution = nativeExecution; } @Override @@ -141,8 +143,8 @@ public PlanNode visitJoin(JoinNode node, RewriteContext context) private boolean meetsDataRequirement(PlanNode left, PlanNode right, JoinNode node) { // Acquire data properties for both left and right side - StreamPropertyDerivations.StreamProperties leftProperties = StreamPropertyDerivations.derivePropertiesRecursively(left, metadata, session, types, parser); - StreamPropertyDerivations.StreamProperties rightProperties = StreamPropertyDerivations.derivePropertiesRecursively(right, metadata, session, types, parser); + StreamPropertyDerivations.StreamProperties leftProperties = StreamPropertyDerivations.derivePropertiesRecursively(left, metadata, session, types, parser, nativeExecution); + StreamPropertyDerivations.StreamProperties rightProperties = StreamPropertyDerivations.derivePropertiesRecursively(right, metadata, session, types, parser, nativeExecution); List leftJoinColumns = node.getCriteria().stream().map(EquiJoinClause::getLeft).collect(toImmutableList()); List rightJoinColumns = node.getCriteria().stream() diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java index e5f87a3783d6d..0e5bdd9ff2c8d 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java @@ -98,20 +98,20 @@ public final class StreamPropertyDerivations { private StreamPropertyDerivations() {} - public static StreamProperties derivePropertiesRecursively(PlanNode node, Metadata metadata, Session session, TypeProvider types, SqlParser parser) + public static StreamProperties derivePropertiesRecursively(PlanNode node, Metadata metadata, Session session, TypeProvider types, SqlParser parser, boolean nativeExecution) { List inputProperties = node.getSources().stream() - .map(source -> derivePropertiesRecursively(source, metadata, session, types, parser)) + .map(source -> derivePropertiesRecursively(source, metadata, session, types, parser, nativeExecution)) .collect(toImmutableList()); - return StreamPropertyDerivations.deriveProperties(node, inputProperties, metadata, session, types, parser); + return StreamPropertyDerivations.deriveProperties(node, inputProperties, metadata, session, types, parser, nativeExecution); } - public static StreamProperties deriveProperties(PlanNode node, StreamProperties inputProperties, Metadata metadata, Session session, TypeProvider types, SqlParser parser) + public static StreamProperties deriveProperties(PlanNode node, StreamProperties inputProperties, Metadata metadata, Session session, TypeProvider types, SqlParser parser, boolean nativeExecution) { - return deriveProperties(node, ImmutableList.of(inputProperties), metadata, session, types, parser); + return deriveProperties(node, ImmutableList.of(inputProperties), metadata, session, types, parser, nativeExecution); } - public static StreamProperties deriveProperties(PlanNode node, List inputProperties, Metadata metadata, Session session, TypeProvider types, SqlParser parser) + public static StreamProperties deriveProperties(PlanNode node, List inputProperties, Metadata metadata, Session session, TypeProvider types, SqlParser parser, boolean nativeExecution) { requireNonNull(node, "node is null"); requireNonNull(inputProperties, "inputProperties is null"); @@ -133,7 +133,7 @@ public static StreamProperties deriveProperties(PlanNode node, List @@ -154,12 +154,14 @@ private static class Visitor private final Metadata metadata; private final Session session; private final TypeProvider types; + private final boolean nativeExecution; - private Visitor(Metadata metadata, Session session, TypeProvider types) + private Visitor(Metadata metadata, Session session, TypeProvider types, boolean nativeExecution) { this.metadata = metadata; this.session = session; this.types = types; + this.nativeExecution = nativeExecution; } @Override @@ -185,9 +187,16 @@ public StreamProperties visitJoin(JoinNode node, List inputPro .translate(column -> PropertyDerivations.filterOrRewrite(outputs, node.getCriteria(), column)) .unordered(unordered); case LEFT: - return leftProperties - .translate(column -> PropertyDerivations.filterIfMissing(outputs, column)) - .unordered(unordered); + if (nativeExecution && node.getCriteria().isEmpty()) { + // This maps to a NestedLoopJoin in Native engine. The NestedLoopJoin output is not + // partitioned or ordered. + return new StreamProperties(MULTIPLE, Optional.empty(), false); + } + else { + return leftProperties + .translate(column -> PropertyDerivations.filterIfMissing(outputs, column)) + .unordered(unordered); + } case RIGHT: // since this is a right join, none of the matched output rows will contain nulls // in the left partitioning columns, and all of the unmatched rows will have diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java index 492ea797165cc..9c678d7f9e07a 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java @@ -55,7 +55,7 @@ public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode) new TypeValidator(), new VerifyNoFilteredAggregations(), new VerifyNoIntermediateFormExpression(), - new ValidateStreamingJoins()) + new ValidateStreamingJoins(featuresConfig.isNativeExecutionEnabled())) .putAll( Stage.FINAL, new CheckUnsupportedExternalFunctions(), @@ -64,8 +64,8 @@ public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode) new TypeValidator(), new VerifyOnlyOneOutputNode(), new VerifyNoFilteredAggregations(), - new ValidateAggregationsWithDefaultValues(forceSingleNode), - new ValidateStreamingAggregations(), + new ValidateAggregationsWithDefaultValues(forceSingleNode, featuresConfig.isNativeExecutionEnabled()), + new ValidateStreamingAggregations(featuresConfig.isNativeExecutionEnabled()), new VerifyNoIntermediateFormExpression(), new VerifyProjectionLocality(), new DynamicFiltersChecker(), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java index 7dcc68c7b4d3a..05d29e2307eaa 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java @@ -54,9 +54,18 @@ public class ValidateAggregationsWithDefaultValues { private final boolean forceSingleNode; + private final boolean nativeExecution; + public ValidateAggregationsWithDefaultValues(boolean forceSingleNode) { this.forceSingleNode = forceSingleNode; + this.nativeExecution = false; + } + + public ValidateAggregationsWithDefaultValues(boolean forceSingleNode, boolean nativeExecution) + { + this.forceSingleNode = forceSingleNode; + this.nativeExecution = nativeExecution; } @Override @@ -126,7 +135,7 @@ public Optional visitAggregation(AggregationNode node, Void conte if (!seenExchanges.localRepartitionExchange) { // No local repartition exchange between final and partial aggregation. // Make sure that final aggregation operators are executed by single thread. - StreamProperties localProperties = StreamPropertyDerivations.derivePropertiesRecursively(node, metadata, session, types, parser); + StreamProperties localProperties = StreamPropertyDerivations.derivePropertiesRecursively(node, metadata, session, types, parser, nativeExecution); checkArgument(localProperties.isSingleStream(), "Final aggregation with default value not separated from partial aggregation by local hash exchange"); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateStreamingAggregations.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateStreamingAggregations.java index 83253754282ea..f9288816ca53f 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateStreamingAggregations.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateStreamingAggregations.java @@ -43,10 +43,22 @@ public class ValidateStreamingAggregations implements Checker { + private final boolean nativeExecution; + + public ValidateStreamingAggregations() + { + this.nativeExecution = false; + } + + public ValidateStreamingAggregations(boolean nativeExecution) + { + this.nativeExecution = nativeExecution; + } + @Override public void validate(PlanNode planNode, Session session, Metadata metadata, SqlParser sqlParser, TypeProvider types, WarningCollector warningCollector) { - planNode.accept(new Visitor(session, metadata, sqlParser, types, warningCollector), null); + planNode.accept(new Visitor(session, metadata, sqlParser, types, warningCollector, nativeExecution), null); } private static final class Visitor @@ -57,14 +69,16 @@ private static final class Visitor private final SqlParser sqlParser; private final TypeProvider types; private final WarningCollector warningCollector; + private final boolean nativeExecution; - private Visitor(Session session, Metadata metadata, SqlParser sqlParser, TypeProvider types, WarningCollector warningCollector) + private Visitor(Session session, Metadata metadata, SqlParser sqlParser, TypeProvider types, WarningCollector warningCollector, boolean nativeExecution) { this.session = session; this.metadata = metadata; this.sqlParser = sqlParser; this.types = types; this.warningCollector = warningCollector; + this.nativeExecution = nativeExecution; } @Override @@ -81,7 +95,7 @@ public Void visitAggregation(AggregationNode node, Void context) return null; } - StreamProperties properties = derivePropertiesRecursively(node.getSource(), metadata, session, types, sqlParser); + StreamProperties properties = derivePropertiesRecursively(node.getSource(), metadata, session, types, sqlParser, false); List> desiredProperties = ImmutableList.of(new GroupingProperty<>(node.getPreGroupedVariables())); Iterator>> matchIterator = LocalProperties.match(properties.getLocalProperties(), desiredProperties).iterator(); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateStreamingJoins.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateStreamingJoins.java index 1c7a0ce218cac..f25b423a0f598 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateStreamingJoins.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateStreamingJoins.java @@ -45,10 +45,22 @@ public class ValidateStreamingJoins implements Checker { + private final boolean nativeExecution; + + public ValidateStreamingJoins() + { + this.nativeExecution = false; + } + + public ValidateStreamingJoins(boolean nativeExecution) + { + this.nativeExecution = nativeExecution; + } + @Override public void validate(PlanNode planNode, Session session, Metadata metadata, SqlParser sqlParser, TypeProvider types, WarningCollector warningCollector) { - planNode.accept(new Visitor(session, metadata, sqlParser, types, warningCollector), null); + planNode.accept(new Visitor(session, metadata, sqlParser, types, warningCollector, nativeExecution), null); } private static final class Visitor @@ -59,14 +71,16 @@ private static final class Visitor private final SqlParser sqlParser; private final TypeProvider types; private final WarningCollector warningCollector; + private final boolean nativeExecution; - private Visitor(Session session, Metadata metadata, SqlParser sqlParser, TypeProvider types, WarningCollector warningCollector) + private Visitor(Session session, Metadata metadata, SqlParser sqlParser, TypeProvider types, WarningCollector warningCollector, boolean nativeExecution) { this.session = session; this.metadata = metadata; this.sqlParser = sqlParser; this.types = types; this.warningCollector = warningCollector; + this.nativeExecution = nativeExecution; } @Override @@ -91,7 +105,7 @@ public Void visitJoin(JoinNode node, Void context) else { requiredBuildProperty = singleStream(); } - StreamProperties buildProperties = derivePropertiesRecursively(node.getRight(), metadata, session, types, sqlParser); + StreamProperties buildProperties = derivePropertiesRecursively(node.getRight(), metadata, session, types, sqlParser, nativeExecution); checkArgument(requiredBuildProperty.isSatisfiedBy(buildProperties), "Build side needs an additional local exchange for join: %s", node.getId()); StreamPreferredProperties requiredProbeProperty; @@ -101,7 +115,7 @@ public Void visitJoin(JoinNode node, Void context) else { requiredProbeProperty = defaultParallelism(session); } - StreamProperties probeProperties = derivePropertiesRecursively(node.getLeft(), metadata, session, types, sqlParser); + StreamProperties probeProperties = derivePropertiesRecursively(node.getLeft(), metadata, session, types, sqlParser, false); checkArgument(requiredProbeProperty.isSatisfiedBy(probeProperties), "Probe side needs an additional local exchange for join: %s", node.getId()); } return null; diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeGeneralQueries.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeGeneralQueries.java index 3119adc80135b..4bcac6f40a6c6 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeGeneralQueries.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeGeneralQueries.java @@ -1635,6 +1635,80 @@ public void testColumnFilter() assertQuery(session, format("SELECT c_boolean, c_bigint, c_double, c_varchar, c_varbinary FROM %s WHERE c_varbinary = to_ieee754_64(1)", tmpTableName)); } + @Test + public void testCorrelatedExistsSubqueries() + { + // projection + assertQuery( + "SELECT EXISTS(SELECT 1 FROM (VALUES 1, 1, 1, 2, 2, 3, 4) i(a) WHERE i.a < o.a AND i.a < 4) " + + "FROM (VALUES 0, 3, 3, 5) o(a)", + "VALUES false, true, true, true"); + assertQuery( + "SELECT EXISTS(SELECT 1 WHERE l.orderkey > 0 OR l.orderkey != 3) " + + "FROM lineitem l LIMIT 1"); + + assertQuery( + "SELECT count(*) FROM orders o " + + "WHERE EXISTS(SELECT 1 FROM orders i WHERE o.orderkey < i.orderkey AND i.orderkey % 1000 = 0)"); // h2 is slow + assertQuery( + "SELECT count(*) FROM lineitem l " + + "WHERE EXISTS(SELECT 1 WHERE l.orderkey > 0 OR l.orderkey != 3)"); + + // order by + assertQuery( + "SELECT orderkey FROM orders o ORDER BY " + + "EXISTS(SELECT 1 FROM orders i WHERE o.orderkey < i.orderkey AND i.orderkey % 10000 = 0)" + + "LIMIT 1"); // h2 is slow + assertQuery( + "SELECT orderkey FROM lineitem l ORDER BY " + + "EXISTS(SELECT 1 WHERE l.orderkey > 0 OR l.orderkey != 3)"); + + // group by + assertQuery( + "SELECT max(o.orderdate), o.orderkey, " + + "EXISTS(SELECT 1 FROM orders i WHERE o.orderkey < i.orderkey AND i.orderkey % 10000 = 0) " + + "FROM orders o GROUP BY o.orderkey ORDER BY o.orderkey LIMIT 1"); + assertQuery( + "SELECT max(o.orderdate), o.orderkey " + + "FROM orders o " + + "GROUP BY o.orderkey " + + "HAVING EXISTS(SELECT 1 FROM orders i WHERE o.orderkey < i.orderkey AND i.orderkey % 10000 = 0)" + + "ORDER BY o.orderkey LIMIT 1"); // h2 is slow + assertQuery( + "SELECT max(o.orderdate), o.orderkey FROM orders o " + + "GROUP BY o.orderkey, EXISTS(SELECT 1 FROM orders i WHERE o.orderkey < i.orderkey AND i.orderkey % 10000 = 0)" + + "ORDER BY o.orderkey LIMIT 1"); // h2 is slow + assertQuery( + "SELECT max(l.quantity), l.orderkey, EXISTS(SELECT 1 WHERE l.orderkey > 0 OR l.orderkey != 3) FROM lineitem l " + + "GROUP BY l.orderkey"); + assertQuery( + "SELECT max(l.quantity), l.orderkey FROM lineitem l " + + "GROUP BY l.orderkey " + + "HAVING EXISTS (SELECT 1 WHERE l.orderkey > 0 OR l.orderkey != 3)"); + assertQuery( + "SELECT max(l.quantity), l.orderkey FROM lineitem l " + + "GROUP BY l.orderkey, EXISTS (SELECT 1 WHERE l.orderkey > 0 OR l.orderkey != 3)"); + + // join + assertQuery( + "SELECT count(*) " + + "FROM (SELECT * FROM orders ORDER BY orderkey LIMIT 10) o1 " + + "JOIN (SELECT * FROM orders ORDER BY orderkey LIMIT 5) o2 " + + "ON NOT EXISTS(SELECT 1 FROM orders i WHERE o1.orderkey < o2.orderkey AND i.orderkey % 10000 = 0)"); + assertQueryFails( + "SELECT count(*) FROM orders o1 LEFT JOIN orders o2 " + + "ON NOT EXISTS(SELECT 1 FROM orders i WHERE o1.orderkey < o2.orderkey)", + "line .*: Correlated subquery in given context is not supported"); + + // subrelation + assertQuery( + "SELECT count(*) FROM orders o " + + "WHERE (SELECT * FROM (SELECT EXISTS(SELECT 1 FROM orders i WHERE o.orderkey < i.orderkey AND i.orderkey % 10000 = 0)))"); // h2 is slow + assertQuery( + "SELECT count(*) FROM orders o " + + "WHERE (SELECT * FROM (SELECT EXISTS(SELECT 1 WHERE o.orderkey > 10 OR o.orderkey != 3)))"); + } + private void assertQueryResultCount(String sql, int expectedResultCount) { assertEquals(getQueryRunner().execute(sql).getRowCount(), expectedResultCount); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/optimizers/PickJoinSides.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/optimizers/PickJoinSides.java index e1413709c6ea5..2f456ab2551c4 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/optimizers/PickJoinSides.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/optimizers/PickJoinSides.java @@ -104,7 +104,8 @@ public Result apply(JoinNode joinNode, Captures captures, Context context) // if we don't have exact costs for the join, but based on source tables we think the left side // is very small or much smaller than the right, then flip the join. if (rightSize > leftSize || (isSizeBasedJoinDistributionTypeEnabled(context.getSession()) && (Double.isNaN(leftSize) || Double.isNaN(rightSize)) && isLeftSideSmall(joinNode, context))) { - rewrittenNode = createRuntimeSwappedJoinNode(joinNode, metadata, sqlParser, context.getLookup(), context.getSession(), context.getVariableAllocator(), context.getIdAllocator()); + // This is never used for Prestissimo. + rewrittenNode = createRuntimeSwappedJoinNode(joinNode, metadata, sqlParser, context.getLookup(), context.getSession(), context.getVariableAllocator(), context.getIdAllocator(), false); } return rewrittenNode.map(Result::ofPlanNode).orElseGet(Result::empty);