Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode)
new TypeValidator(),
new VerifyNoFilteredAggregations(),
new VerifyNoIntermediateFormExpression(),
new ValidateStreamingJoins())
new ValidateStreamingJoins(featuresConfig))
.putAll(
Stage.FINAL,
new CheckUnsupportedExternalFunctions(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.spi.plan.EquiJoinClause;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.planner.optimizations.StreamPreferredProperties;
import com.facebook.presto.sql.planner.optimizations.StreamPropertyDerivations.StreamProperties;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
Expand All @@ -39,26 +40,36 @@
import static com.facebook.presto.sql.planner.optimizations.StreamPropertyDerivations.derivePropertiesRecursively;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

public class ValidateStreamingJoins
implements Checker
{
private final boolean nativeExecutionEnabled;

public ValidateStreamingJoins(FeaturesConfig featuresConfig)
{
this.nativeExecutionEnabled = requireNonNull(featuresConfig).isNativeExecutionEnabled();
}

@Override
public void validate(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector)
{
planNode.accept(new Visitor(session, metadata), null);
planNode.accept(new Visitor(session, metadata, nativeExecutionEnabled), null);
}

private static final class Visitor
extends InternalPlanVisitor<Void, Void>
{
private final Session session;
private final Metadata metadata;
private final boolean nativeExecutionEnabled;

private Visitor(Session session, Metadata metadata)
private Visitor(Session session, Metadata metadata, boolean nativeExecutionEnabled)
{
this.session = session;
this.metadata = metadata;
this.nativeExecutionEnabled = nativeExecutionEnabled;
}

@Override
Expand Down Expand Up @@ -87,7 +98,7 @@ public Void visitJoin(JoinNode node, Void context)
checkArgument(requiredBuildProperty.isSatisfiedBy(buildProperties), "Build side needs an additional local exchange for join: %s", node.getId());

StreamPreferredProperties requiredProbeProperty;
if (isSpillEnabled(session) && isJoinSpillingEnabled(session)) {
if (isSpillEnabled(session) && isJoinSpillingEnabled(session) && !nativeExecutionEnabled) {
requiredProbeProperty = fixedParallelism();
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,36 @@
package com.facebook.presto.sql.planner.sanity;

import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.cost.HistoryBasedOptimizationConfig;
import com.facebook.presto.execution.QueryManagerConfig;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
import com.facebook.presto.execution.warnings.WarningCollectorConfig;
import com.facebook.presto.memory.MemoryManagerConfig;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.EquiJoinClause;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.spiller.NodeSpillConfig;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.analyzer.FunctionsConfig;
import com.facebook.presto.sql.planner.CompilerConfig;
import com.facebook.presto.sql.planner.assertions.BasePlanTest;
import com.facebook.presto.sql.planner.iterative.rule.test.PlanBuilder;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.testing.TestingTransactionHandle;
import com.facebook.presto.tpch.TpchColumnHandle;
import com.facebook.presto.tpch.TpchTableHandle;
import com.facebook.presto.tpch.TpchTableLayoutHandle;
import com.facebook.presto.tracing.TracingConfig;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.AfterClass;
Expand All @@ -40,15 +53,15 @@
import java.util.Optional;
import java.util.function.Function;

import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.spi.plan.JoinType.INNER;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;

public class TestValidateStreamingJoins
extends BasePlanTest
{
private Session testSession;
private Session defaultSession;
private Session spillSession;
private Metadata metadata;
private PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator();
private TableHandle nationTableHandle;
Expand All @@ -62,7 +75,26 @@ public void setup()
Session.SessionBuilder sessionBuilder = testSessionBuilder()
.setCatalog("local")
.setSchema("tiny");
testSession = sessionBuilder.build();
defaultSession = sessionBuilder.build();
spillSession = testSessionBuilder(
new SessionPropertyManager(new SystemSessionProperties(
new QueryManagerConfig(),
new TaskManagerConfig(),
new MemoryManagerConfig(),
new FeaturesConfig().setSpillerSpillPaths("/path/to/nowhere"),
new FunctionsConfig(),
new NodeMemoryConfig(),
new WarningCollectorConfig(),
new NodeSchedulerConfig(),
new NodeSpillConfig(),
new TracingConfig(),
new CompilerConfig(),
new HistoryBasedOptimizationConfig())))
.setCatalog("local")
.setSchema("tiny")
.setSystemProperty("spill_enabled", "true")
.setSystemProperty("join_spill_enabled", "true")
.build();
metadata = getQueryRunner().getMetadata();

TpchTableHandle nationTpchTableHandle = new TpchTableHandle("nation", 1.0);
Expand All @@ -85,7 +117,8 @@ public void setup()
@AfterClass(alwaysRun = true)
public void tearDown()
{
testSession = null;
defaultSession = null;
spillSession = null;
metadata = null;
idAllocator = null;
nationTableHandle = null;
Expand Down Expand Up @@ -123,14 +156,58 @@ public void testValidateFailed()
Optional.empty()));
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Probe side needs an additional local exchange for join: [0-9]*")
public void testValidateFailsForJavaSpillEnabled()
{
validatePlan(
p -> p.join(
INNER,
p.tableScan(nationTableHandle, ImmutableList.of(p.variable("nationkeyN", BIGINT)), ImmutableMap.of(p.variable("nationkeyN", BIGINT), nationColumnHandle)),
p.exchange(e -> e
.scope(ExchangeNode.Scope.LOCAL)
.type(ExchangeNode.Type.REPARTITION)
.addSource(p.tableScan(supplierTableHandle, ImmutableList.of(p.variable("nationkeyS", BIGINT), p.variable("suppkey", BIGINT)), ImmutableMap.of(p.variable("nationkeyS", BIGINT), nationColumnHandle, p.variable("suppkey", BIGINT), suppColumnHandle)))
.addInputsSet(ImmutableList.of(p.variable("nationkeyS", BIGINT), p.variable("suppkey", BIGINT)))
.fixedHashDistributionPartitioningScheme(ImmutableList.of(p.variable("nationkeyS", BIGINT), p.variable("suppkey", BIGINT)), ImmutableList.of(p.variable("nationkeyS", BIGINT)))),
ImmutableList.of(new EquiJoinClause(p.variable("nationkeyN", BIGINT), p.variable("nationkeyS", BIGINT))),
ImmutableList.of(p.variable("nationkeyN", BIGINT), p.variable("nationkeyS", BIGINT), p.variable("suppkey", BIGINT)),
Optional.empty()),
false,
spillSession);
}

@Test
public void testValidateSucceedsForNativeSpillEnabled()
{
validatePlan(
p -> p.join(
INNER,
p.tableScan(nationTableHandle, ImmutableList.of(p.variable("nationkeyN", BIGINT)), ImmutableMap.of(p.variable("nationkeyN", BIGINT), nationColumnHandle)),
p.exchange(e -> e
.scope(ExchangeNode.Scope.LOCAL)
.type(ExchangeNode.Type.REPARTITION)
.addSource(p.tableScan(supplierTableHandle, ImmutableList.of(p.variable("nationkeyS", BIGINT), p.variable("suppkey", BIGINT)), ImmutableMap.of(p.variable("nationkeyS", BIGINT), nationColumnHandle, p.variable("suppkey", BIGINT), suppColumnHandle)))
.addInputsSet(ImmutableList.of(p.variable("nationkeyS", BIGINT), p.variable("suppkey", BIGINT)))
.fixedHashDistributionPartitioningScheme(ImmutableList.of(p.variable("nationkeyS", BIGINT), p.variable("suppkey", BIGINT)), ImmutableList.of(p.variable("nationkeyS", BIGINT)))),
ImmutableList.of(new EquiJoinClause(p.variable("nationkeyN", BIGINT), p.variable("nationkeyS", BIGINT))),
ImmutableList.of(p.variable("nationkeyN", BIGINT), p.variable("nationkeyS", BIGINT), p.variable("suppkey", BIGINT)),
Optional.empty()),
true,
spillSession);
}

private void validatePlan(Function<PlanBuilder, PlanNode> planProvider)
{
PlanBuilder builder = new PlanBuilder(TEST_SESSION, idAllocator, metadata);
validatePlan(planProvider, false, defaultSession);
}

private void validatePlan(Function<PlanBuilder, PlanNode> planProvider, boolean nativeExecutionEnabled, Session testSession)
{
PlanBuilder builder = new PlanBuilder(testSession, idAllocator, metadata);
PlanNode planNode = planProvider.apply(builder);
TypeProvider types = builder.getTypes();
getQueryRunner().inTransaction(testSession, session -> {
session.getCatalog().ifPresent(catalog -> metadata.getCatalogHandle(session, catalog));
new ValidateStreamingJoins().validate(planNode, session, metadata, WarningCollector.NOOP);
new ValidateStreamingJoins(new FeaturesConfig().setNativeExecutionEnabled(nativeExecutionEnabled)).validate(planNode, session, metadata, WarningCollector.NOOP);
return null;
});
}
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 117 files