diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java index bddba01cfad13..0263b88a93fc2 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java @@ -53,7 +53,7 @@ public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode) new TypeValidator(), new VerifyNoFilteredAggregations(), new VerifyNoIntermediateFormExpression(), - new ValidateStreamingJoins()) + new ValidateStreamingJoins(featuresConfig)) .putAll( Stage.FINAL, new CheckUnsupportedExternalFunctions(), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateStreamingJoins.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateStreamingJoins.java index bf0f10806bf5d..412c39ff02720 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateStreamingJoins.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateStreamingJoins.java @@ -19,6 +19,7 @@ import com.facebook.presto.spi.plan.EquiJoinClause; import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spi.relation.VariableReferenceExpression; +import com.facebook.presto.sql.analyzer.FeaturesConfig; import com.facebook.presto.sql.planner.optimizations.StreamPreferredProperties; import com.facebook.presto.sql.planner.optimizations.StreamPropertyDerivations.StreamProperties; import com.facebook.presto.sql.planner.plan.InternalPlanVisitor; @@ -39,14 +40,22 @@ import static com.facebook.presto.sql.planner.optimizations.StreamPropertyDerivations.derivePropertiesRecursively; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Objects.requireNonNull; public class ValidateStreamingJoins implements Checker { + private final boolean nativeExecutionEnabled; + + public ValidateStreamingJoins(FeaturesConfig featuresConfig) + { + this.nativeExecutionEnabled = requireNonNull(featuresConfig).isNativeExecutionEnabled(); + } + @Override public void validate(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector) { - planNode.accept(new Visitor(session, metadata), null); + planNode.accept(new Visitor(session, metadata, nativeExecutionEnabled), null); } private static final class Visitor @@ -54,11 +63,13 @@ private static final class Visitor { private final Session session; private final Metadata metadata; + private final boolean nativeExecutionEnabled; - private Visitor(Session session, Metadata metadata) + private Visitor(Session session, Metadata metadata, boolean nativeExecutionEnabled) { this.session = session; this.metadata = metadata; + this.nativeExecutionEnabled = nativeExecutionEnabled; } @Override @@ -87,7 +98,7 @@ public Void visitJoin(JoinNode node, Void context) checkArgument(requiredBuildProperty.isSatisfiedBy(buildProperties), "Build side needs an additional local exchange for join: %s", node.getId()); StreamPreferredProperties requiredProbeProperty; - if (isSpillEnabled(session) && isJoinSpillingEnabled(session)) { + if (isSpillEnabled(session) && isJoinSpillingEnabled(session) && !nativeExecutionEnabled) { requiredProbeProperty = fixedParallelism(); } else { diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/sanity/TestValidateStreamingJoins.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/sanity/TestValidateStreamingJoins.java index f0084a0c816bd..4edbbdee229b8 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/sanity/TestValidateStreamingJoins.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/sanity/TestValidateStreamingJoins.java @@ -14,8 +14,17 @@ package com.facebook.presto.sql.planner.sanity; import com.facebook.presto.Session; +import com.facebook.presto.SystemSessionProperties; import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.cost.HistoryBasedOptimizationConfig; +import com.facebook.presto.execution.QueryManagerConfig; +import com.facebook.presto.execution.TaskManagerConfig; +import com.facebook.presto.execution.scheduler.NodeSchedulerConfig; +import com.facebook.presto.execution.warnings.WarningCollectorConfig; +import com.facebook.presto.memory.MemoryManagerConfig; +import com.facebook.presto.memory.NodeMemoryConfig; import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.metadata.SessionPropertyManager; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.TableHandle; @@ -23,7 +32,10 @@ import com.facebook.presto.spi.plan.EquiJoinClause; import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spi.plan.PlanNodeIdAllocator; -import com.facebook.presto.sql.planner.TypeProvider; +import com.facebook.presto.spiller.NodeSpillConfig; +import com.facebook.presto.sql.analyzer.FeaturesConfig; +import com.facebook.presto.sql.analyzer.FunctionsConfig; +import com.facebook.presto.sql.planner.CompilerConfig; import com.facebook.presto.sql.planner.assertions.BasePlanTest; import com.facebook.presto.sql.planner.iterative.rule.test.PlanBuilder; import com.facebook.presto.sql.planner.plan.ExchangeNode; @@ -31,6 +43,7 @@ import com.facebook.presto.tpch.TpchColumnHandle; import com.facebook.presto.tpch.TpchTableHandle; import com.facebook.presto.tpch.TpchTableLayoutHandle; +import com.facebook.presto.tracing.TracingConfig; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.testng.annotations.AfterClass; @@ -40,7 +53,6 @@ import java.util.Optional; import java.util.function.Function; -import static com.facebook.presto.SessionTestUtils.TEST_SESSION; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.spi.plan.JoinType.INNER; import static com.facebook.presto.testing.TestingSession.testSessionBuilder; @@ -48,7 +60,8 @@ public class TestValidateStreamingJoins extends BasePlanTest { - private Session testSession; + private Session defaultSession; + private Session spillSession; private Metadata metadata; private PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator(); private TableHandle nationTableHandle; @@ -62,7 +75,26 @@ public void setup() Session.SessionBuilder sessionBuilder = testSessionBuilder() .setCatalog("local") .setSchema("tiny"); - testSession = sessionBuilder.build(); + defaultSession = sessionBuilder.build(); + spillSession = testSessionBuilder( + new SessionPropertyManager(new SystemSessionProperties( + new QueryManagerConfig(), + new TaskManagerConfig(), + new MemoryManagerConfig(), + new FeaturesConfig().setSpillerSpillPaths("/path/to/nowhere"), + new FunctionsConfig(), + new NodeMemoryConfig(), + new WarningCollectorConfig(), + new NodeSchedulerConfig(), + new NodeSpillConfig(), + new TracingConfig(), + new CompilerConfig(), + new HistoryBasedOptimizationConfig()))) + .setCatalog("local") + .setSchema("tiny") + .setSystemProperty("spill_enabled", "true") + .setSystemProperty("join_spill_enabled", "true") + .build(); metadata = getQueryRunner().getMetadata(); TpchTableHandle nationTpchTableHandle = new TpchTableHandle("nation", 1.0); @@ -85,7 +117,8 @@ public void setup() @AfterClass(alwaysRun = true) public void tearDown() { - testSession = null; + defaultSession = null; + spillSession = null; metadata = null; idAllocator = null; nationTableHandle = null; @@ -123,14 +156,58 @@ public void testValidateFailed() Optional.empty())); } + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Probe side needs an additional local exchange for join: [0-9]*") + public void testValidateFailsForJavaSpillEnabled() + { + validatePlan( + p -> p.join( + INNER, + p.tableScan(nationTableHandle, ImmutableList.of(p.variable("nationkeyN", BIGINT)), ImmutableMap.of(p.variable("nationkeyN", BIGINT), nationColumnHandle)), + p.exchange(e -> e + .scope(ExchangeNode.Scope.LOCAL) + .type(ExchangeNode.Type.REPARTITION) + .addSource(p.tableScan(supplierTableHandle, ImmutableList.of(p.variable("nationkeyS", BIGINT), p.variable("suppkey", BIGINT)), ImmutableMap.of(p.variable("nationkeyS", BIGINT), nationColumnHandle, p.variable("suppkey", BIGINT), suppColumnHandle))) + .addInputsSet(ImmutableList.of(p.variable("nationkeyS", BIGINT), p.variable("suppkey", BIGINT))) + .fixedHashDistributionPartitioningScheme(ImmutableList.of(p.variable("nationkeyS", BIGINT), p.variable("suppkey", BIGINT)), ImmutableList.of(p.variable("nationkeyS", BIGINT)))), + ImmutableList.of(new EquiJoinClause(p.variable("nationkeyN", BIGINT), p.variable("nationkeyS", BIGINT))), + ImmutableList.of(p.variable("nationkeyN", BIGINT), p.variable("nationkeyS", BIGINT), p.variable("suppkey", BIGINT)), + Optional.empty()), + false, + spillSession); + } + + @Test + public void testValidateSucceedsForNativeSpillEnabled() + { + validatePlan( + p -> p.join( + INNER, + p.tableScan(nationTableHandle, ImmutableList.of(p.variable("nationkeyN", BIGINT)), ImmutableMap.of(p.variable("nationkeyN", BIGINT), nationColumnHandle)), + p.exchange(e -> e + .scope(ExchangeNode.Scope.LOCAL) + .type(ExchangeNode.Type.REPARTITION) + .addSource(p.tableScan(supplierTableHandle, ImmutableList.of(p.variable("nationkeyS", BIGINT), p.variable("suppkey", BIGINT)), ImmutableMap.of(p.variable("nationkeyS", BIGINT), nationColumnHandle, p.variable("suppkey", BIGINT), suppColumnHandle))) + .addInputsSet(ImmutableList.of(p.variable("nationkeyS", BIGINT), p.variable("suppkey", BIGINT))) + .fixedHashDistributionPartitioningScheme(ImmutableList.of(p.variable("nationkeyS", BIGINT), p.variable("suppkey", BIGINT)), ImmutableList.of(p.variable("nationkeyS", BIGINT)))), + ImmutableList.of(new EquiJoinClause(p.variable("nationkeyN", BIGINT), p.variable("nationkeyS", BIGINT))), + ImmutableList.of(p.variable("nationkeyN", BIGINT), p.variable("nationkeyS", BIGINT), p.variable("suppkey", BIGINT)), + Optional.empty()), + true, + spillSession); + } + private void validatePlan(Function planProvider) { - PlanBuilder builder = new PlanBuilder(TEST_SESSION, idAllocator, metadata); + validatePlan(planProvider, false, defaultSession); + } + + private void validatePlan(Function planProvider, boolean nativeExecutionEnabled, Session testSession) + { + PlanBuilder builder = new PlanBuilder(testSession, idAllocator, metadata); PlanNode planNode = planProvider.apply(builder); - TypeProvider types = builder.getTypes(); getQueryRunner().inTransaction(testSession, session -> { session.getCatalog().ifPresent(catalog -> metadata.getCatalogHandle(session, catalog)); - new ValidateStreamingJoins().validate(planNode, session, metadata, WarningCollector.NOOP); + new ValidateStreamingJoins(new FeaturesConfig().setNativeExecutionEnabled(nativeExecutionEnabled)).validate(planNode, session, metadata, WarningCollector.NOOP); return null; }); } diff --git a/presto-native-execution/velox b/presto-native-execution/velox index 6e52cbda7c570..3bf1a74042e83 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit 6e52cbda7c5705ece71cd79c612ddcf5d8a9ed14 +Subproject commit 3bf1a74042e83981630d24c1eaee190d1f31aac2