diff --git a/presto-main/src/main/java/com/facebook/presto/cost/CachingCostProvider.java b/presto-main/src/main/java/com/facebook/presto/cost/CachingCostProvider.java index f6e3acf007ed7..7e100df573146 100644 --- a/presto-main/src/main/java/com/facebook/presto/cost/CachingCostProvider.java +++ b/presto-main/src/main/java/com/facebook/presto/cost/CachingCostProvider.java @@ -38,15 +38,21 @@ public class CachingCostProvider private final Lookup lookup; private final Session session; private final TypeProvider types; + private final PlanNodeSourceProvider sourceProvider; private final Map cache = new IdentityHashMap<>(); - public CachingCostProvider(CostCalculator costCalculator, StatsProvider statsProvider, Session session, TypeProvider types) + public CachingCostProvider(CostCalculator costCalculator, StatsProvider statsProvider, Optional memo, Lookup lookup, Session session, TypeProvider types) { - this(costCalculator, statsProvider, Optional.empty(), noLookup(), session, types); + this(costCalculator, statsProvider, memo, lookup, session, types, PlanNode::getSources); } - public CachingCostProvider(CostCalculator costCalculator, StatsProvider statsProvider, Optional memo, Lookup lookup, Session session, TypeProvider types) + public CachingCostProvider(CostCalculator costCalculator, StatsProvider statsProvider, Session session, TypeProvider types, PlanNodeSourceProvider sourceProvider) + { + this(costCalculator, statsProvider, Optional.empty(), noLookup(), session, types, sourceProvider); + } + + public CachingCostProvider(CostCalculator costCalculator, StatsProvider statsProvider, Optional memo, Lookup lookup, Session session, TypeProvider types, PlanNodeSourceProvider sourceProvider) { this.costCalculator = requireNonNull(costCalculator, "costCalculator is null"); this.statsProvider = requireNonNull(statsProvider, "statsProvider is null"); @@ -54,6 +60,7 @@ public CachingCostProvider(CostCalculator costCalculator, StatsProvider statsPro this.lookup = requireNonNull(lookup, "lookup is null"); this.session = requireNonNull(session, "session is null"); this.types = requireNonNull(types, "types is null"); + this.sourceProvider = requireNonNull(sourceProvider, "sourceProvider is null"); } @Override @@ -95,7 +102,7 @@ private PlanNodeCostEstimate calculateCumulativeCost(PlanNode node) { PlanNodeCostEstimate localCosts = costCalculator.calculateCost(node, statsProvider, lookup, session, types); - PlanNodeCostEstimate sourcesCost = node.getSources().stream() + PlanNodeCostEstimate sourcesCost = sourceProvider.getSources(node).stream() .map(this::getCumulativeCost) .reduce(ZERO_COST, PlanNodeCostEstimate::add); diff --git a/presto-main/src/main/java/com/facebook/presto/cost/FragmentedPlanCostCalculator.java b/presto-main/src/main/java/com/facebook/presto/cost/FragmentedPlanCostCalculator.java new file mode 100644 index 0000000000000..e609dcbfd99b5 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/cost/FragmentedPlanCostCalculator.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.cost; + +import com.facebook.presto.Session; +import com.facebook.presto.execution.scheduler.NodeSchedulerConfig; +import com.facebook.presto.metadata.InternalNodeManager; +import com.facebook.presto.sql.planner.TypeProvider; +import com.facebook.presto.sql.planner.iterative.Lookup; +import com.facebook.presto.sql.planner.plan.ExchangeNode; +import com.facebook.presto.sql.planner.plan.PlanNode; +import com.facebook.presto.sql.planner.plan.RemoteSourceNode; + +import java.util.function.IntSupplier; + +import static com.facebook.presto.cost.CostCalculatorUsingExchanges.calculateExchangeCost; +import static com.facebook.presto.cost.CostCalculatorUsingExchanges.currentNumberOfWorkerNodes; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE; +import static java.util.Objects.requireNonNull; + +public class FragmentedPlanCostCalculator + implements CostCalculator +{ + private final CostCalculator delegate; + private final FragmentedPlanSourceProvider sourceProvider; + private final IntSupplier numberOfNodes; + + public FragmentedPlanCostCalculator(FragmentedPlanSourceProvider sourceProvider, CostCalculator delegate, InternalNodeManager nodeManager, NodeSchedulerConfig nodeSchedulerConfig) + { + this(delegate, sourceProvider, currentNumberOfWorkerNodes(nodeSchedulerConfig.isIncludeCoordinator(), nodeManager)); + } + + public FragmentedPlanCostCalculator(CostCalculator delegate, FragmentedPlanSourceProvider sourceProvider, IntSupplier numberOfNodes) + { + this.sourceProvider = requireNonNull(sourceProvider, "sourceProvider is null"); + this.delegate = requireNonNull(delegate, "delegate is null"); + this.numberOfNodes = requireNonNull(numberOfNodes, "numberOfNodes is null"); + } + + @Override + public PlanNodeCostEstimate calculateCost(PlanNode node, StatsProvider stats, Lookup lookup, Session session, TypeProvider types) + { + if (node instanceof RemoteSourceNode) { + return calculateRemoteSourceNodeCost((RemoteSourceNode) node, stats, types); + } + else { + return delegate.calculateCost(node, stats, lookup, session, types); + } + } + + private PlanNodeCostEstimate calculateRemoteSourceNodeCost(RemoteSourceNode node, StatsProvider stats, TypeProvider types) + { + PlanNodeCostEstimate costEstimate = PlanNodeCostEstimate.ZERO_COST; + ExchangeNode.Type exchangeType = node.getExchangeType(); + for (PlanNode source : sourceProvider.getSources(node)) { + PlanNodeCostEstimate exchangeCost = calculateExchangeCost(numberOfNodes.getAsInt(), stats.getStats(source), node.getOutputSymbols(), exchangeType, REMOTE, types); + costEstimate.add(exchangeCost); + } + return costEstimate; + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/cost/FragmentedPlanSourceProvider.java b/presto-main/src/main/java/com/facebook/presto/cost/FragmentedPlanSourceProvider.java new file mode 100644 index 0000000000000..7a8915402b980 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/cost/FragmentedPlanSourceProvider.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.cost; + +import com.facebook.presto.sql.planner.PlanFragment; +import com.facebook.presto.sql.planner.plan.PlanFragmentId; +import com.facebook.presto.sql.planner.plan.PlanNode; +import com.facebook.presto.sql.planner.plan.RemoteSourceNode; +import com.google.common.collect.ImmutableMap; + +import java.util.List; +import java.util.Map; + +import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static java.util.Objects.requireNonNull; + +public class FragmentedPlanSourceProvider + implements PlanNodeSourceProvider +{ + private final Map fragments; + + public static FragmentedPlanSourceProvider create(List planFragments) + { + Map fragmentIdPlanNodeMap = planFragments.stream() + .collect(toImmutableMap(PlanFragment::getId, fragment -> fragment)); + return new FragmentedPlanSourceProvider(fragmentIdPlanNodeMap); + } + + private FragmentedPlanSourceProvider(Map fragments) + { + this.fragments = ImmutableMap.copyOf(requireNonNull(fragments, "fragments is null")); + } + + @Override + public List getSources(PlanNode node) + { + if (node instanceof RemoteSourceNode) { + return ((RemoteSourceNode) node).getSourceFragmentIds().stream() + .map(id -> { + verify(fragments.containsKey(id), "fragment id not in map: %s", id); + return fragments.get(id).getRoot(); + }) + .collect(toImmutableList()); + } + + return node.getSources(); + } + + public List getSourceFragments(RemoteSourceNode node) + { + return node.getSourceFragmentIds().stream() + .map(id -> { + verify(fragments.containsKey(id), "fragment id not in map: %s", id); + return fragments.get(id); + }) + .collect(toImmutableList()); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/cost/FragmentedPlanStatsCalculator.java b/presto-main/src/main/java/com/facebook/presto/cost/FragmentedPlanStatsCalculator.java new file mode 100644 index 0000000000000..ee6c1d5aff9e8 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/cost/FragmentedPlanStatsCalculator.java @@ -0,0 +1,83 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.cost; + +import com.facebook.presto.Session; +import com.facebook.presto.sql.planner.PlanFragment; +import com.facebook.presto.sql.planner.Symbol; +import com.facebook.presto.sql.planner.TypeProvider; +import com.facebook.presto.sql.planner.iterative.Lookup; +import com.facebook.presto.sql.planner.plan.PlanNode; +import com.facebook.presto.sql.planner.plan.RemoteSourceNode; + +import java.util.List; + +import static com.facebook.presto.cost.PlanNodeStatsEstimateMath.addStatsAndMaxDistinctValues; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + +public class FragmentedPlanStatsCalculator + implements StatsCalculator +{ + private final StatsCalculator delegate; + private final FragmentedPlanSourceProvider sourceProvider; + + public FragmentedPlanStatsCalculator(StatsCalculator delegate, FragmentedPlanSourceProvider sourceProvider) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.sourceProvider = requireNonNull(sourceProvider, "source provider is null"); + } + + @Override + public PlanNodeStatsEstimate calculateStats(PlanNode node, StatsProvider sourceStats, Lookup lookup, Session session, TypeProvider types) + { + if (node instanceof RemoteSourceNode) { + return calculateRemoteSourceStats((RemoteSourceNode) node, sourceStats); + } + return delegate.calculateStats(node, sourceStats, lookup, session, types); + } + + private PlanNodeStatsEstimate calculateRemoteSourceStats(RemoteSourceNode node, StatsProvider statsProvider) + { + PlanNodeStatsEstimate estimate = null; + for (PlanFragment sourceFragment : sourceProvider.getSourceFragments(node)) { + PlanNodeStatsEstimate sourceStatsWithMappedSymbols = mapToOutputSymbols(statsProvider.getStats(sourceFragment.getRoot()), sourceFragment.getPartitioningScheme().getOutputLayout(), node.getOutputSymbols()); + + if (estimate != null) { + estimate = addStatsAndMaxDistinctValues(estimate, sourceStatsWithMappedSymbols); + } + else { + estimate = sourceStatsWithMappedSymbols; + } + } + + verify(estimate != null, "estimate is null"); + return estimate; + } + + private PlanNodeStatsEstimate mapToOutputSymbols(PlanNodeStatsEstimate estimate, List inputs, List outputs) + { + checkArgument(inputs.size() == outputs.size(), "Input symbols count does not match output symbols count"); + PlanNodeStatsEstimate.Builder mapped = PlanNodeStatsEstimate.builder() + .setOutputRowCount(estimate.getOutputRowCount()); + + for (int i = 0; i < inputs.size(); i++) { + mapped.addSymbolStatistics(outputs.get(i), estimate.getSymbolStatistics(inputs.get(i))); + } + + return mapped.build(); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/cost/PlanNodeSourceProvider.java b/presto-main/src/main/java/com/facebook/presto/cost/PlanNodeSourceProvider.java new file mode 100644 index 0000000000000..7583ab0fc623e --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/cost/PlanNodeSourceProvider.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.cost; + +import com.facebook.presto.sql.planner.plan.PlanNode; + +import java.util.List; + +@FunctionalInterface +public interface PlanNodeSourceProvider +{ + List getSources(PlanNode node); +} diff --git a/presto-main/src/main/java/com/facebook/presto/event/query/QueryMonitor.java b/presto-main/src/main/java/com/facebook/presto/event/query/QueryMonitor.java index 377955e83392a..066d30b03bc61 100644 --- a/presto-main/src/main/java/com/facebook/presto/event/query/QueryMonitor.java +++ b/presto-main/src/main/java/com/facebook/presto/event/query/QueryMonitor.java @@ -15,6 +15,8 @@ import com.facebook.presto.client.NodeVersion; import com.facebook.presto.connector.ConnectorId; +import com.facebook.presto.cost.CostCalculator; +import com.facebook.presto.cost.StatsCalculator; import com.facebook.presto.eventlistener.EventListenerManager; import com.facebook.presto.execution.Column; import com.facebook.presto.execution.ExecutionFailureInfo; @@ -25,7 +27,9 @@ import com.facebook.presto.execution.TaskId; import com.facebook.presto.execution.TaskInfo; import com.facebook.presto.execution.TaskState; +import com.facebook.presto.execution.scheduler.NodeSchedulerConfig; import com.facebook.presto.metadata.FunctionRegistry; +import com.facebook.presto.metadata.InternalNodeManager; import com.facebook.presto.metadata.Metadata; import com.facebook.presto.metadata.SessionPropertyManager; import com.facebook.presto.operator.DriverStats; @@ -66,8 +70,6 @@ import java.util.Optional; import java.util.stream.Collectors; -import static com.facebook.presto.cost.PlanNodeCostEstimate.UNKNOWN_COST; -import static com.facebook.presto.cost.PlanNodeStatsEstimate.UNKNOWN_STATS; import static com.facebook.presto.sql.planner.planPrinter.PlanPrinter.textDistributedPlan; import static java.lang.Math.max; import static java.lang.Math.toIntExact; @@ -88,6 +90,10 @@ public class QueryMonitor private final SessionPropertyManager sessionPropertyManager; private final FunctionRegistry functionRegistry; private final int maxJsonLimit; + private final StatsCalculator statsCalculator; + private final CostCalculator costCalculator; + private final InternalNodeManager nodeManager; + private final NodeSchedulerConfig nodeSchedulerConfig; @Inject public QueryMonitor( @@ -98,7 +104,11 @@ public QueryMonitor( NodeVersion nodeVersion, SessionPropertyManager sessionPropertyManager, Metadata metadata, - QueryMonitorConfig config) + QueryMonitorConfig config, + StatsCalculator statsCalculator, + CostCalculator costCalculator, + InternalNodeManager nodeManager, + NodeSchedulerConfig nodeSchedulerConfig) { this.eventListenerManager = requireNonNull(eventListenerManager, "eventListenerManager is null"); this.stageInfoCodec = requireNonNull(stageInfoCodec, "stageInfoCodec is null"); @@ -109,6 +119,10 @@ public QueryMonitor( this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null"); this.functionRegistry = requireNonNull(metadata, "metadata is null").getFunctionRegistry(); this.maxJsonLimit = toIntExact(requireNonNull(config, "config is null").getMaxOutputStageJsonSize().toBytes()); + this.statsCalculator = requireNonNull(statsCalculator, "statsCalculator is null"); + this.costCalculator = requireNonNull(costCalculator, "costCalculator is null"); + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); + this.nodeSchedulerConfig = requireNonNull(nodeSchedulerConfig, "nodeSchedulerConfig is null"); } public void queryCreatedEvent(QueryInfo queryInfo) @@ -199,12 +213,13 @@ public void queryCompletedEvent(QueryInfo queryInfo) Optional plan = Optional.empty(); try { if (queryInfo.getOutputStage().isPresent()) { - // Stats and costs are suppress, since transaction is already completed plan = Optional.of(textDistributedPlan( queryInfo.getOutputStage().get(), functionRegistry, - (node, sourceStats, lookup, session, types) -> UNKNOWN_STATS, - (node, stats, lookup, session, types) -> UNKNOWN_COST, + statsCalculator, + costCalculator, + nodeManager, + nodeSchedulerConfig, queryInfo.getSession().toSession(sessionPropertyManager), false)); } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/ExplainAnalyzeContext.java b/presto-main/src/main/java/com/facebook/presto/execution/ExplainAnalyzeContext.java index 2d780090eef64..b75c9b5151dc0 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/ExplainAnalyzeContext.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/ExplainAnalyzeContext.java @@ -15,6 +15,8 @@ import com.facebook.presto.cost.CostCalculator; import com.facebook.presto.cost.StatsCalculator; +import com.facebook.presto.execution.scheduler.NodeSchedulerConfig; +import com.facebook.presto.metadata.InternalNodeManager; import javax.inject.Inject; @@ -25,16 +27,22 @@ public class ExplainAnalyzeContext private final QueryPerformanceFetcher queryPerformanceFetcher; private final StatsCalculator statsCalculator; private final CostCalculator costCalculator; + private final InternalNodeManager nodeManager; + private final NodeSchedulerConfig nodeSchedulerConfig; @Inject public ExplainAnalyzeContext( QueryPerformanceFetcher queryPerformanceFetcher, StatsCalculator statsCalculator, - CostCalculator costCalculator) + CostCalculator costCalculator, + InternalNodeManager nodeManager, + NodeSchedulerConfig nodeSchedulerConfig) { this.queryPerformanceFetcher = requireNonNull(queryPerformanceFetcher, "queryPerformanceFetcher is null"); this.statsCalculator = requireNonNull(statsCalculator, "statsCalculator is null"); this.costCalculator = requireNonNull(costCalculator, "costCalculator is null"); + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); + this.nodeSchedulerConfig = requireNonNull(nodeSchedulerConfig, "nodeSchedulerConfig is null"); } public QueryPerformanceFetcher getQueryPerformanceFetcher() @@ -51,4 +59,14 @@ public CostCalculator getCostCalculator() { return costCalculator; } + + public InternalNodeManager getNodeManager() + { + return nodeManager; + } + + public NodeSchedulerConfig getNodeSchedulerConfig() + { + return nodeSchedulerConfig; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java b/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java index 46cb3a03e787c..cd96c6bfe0ac1 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java @@ -181,6 +181,11 @@ public static MetadataManager createTestMetadataManager(CatalogManager catalogMa } public static MetadataManager createTestMetadataManager(CatalogManager catalogManager, FeaturesConfig featuresConfig) + { + return createTestMetadataManager(createTestTransactionManager(catalogManager), featuresConfig); + } + + public static MetadataManager createTestMetadataManager(TransactionManager transactionManager, FeaturesConfig featuresConfig) { TypeManager typeManager = new TypeRegistry(ImmutableSet.of(), featuresConfig); return new MetadataManager( @@ -191,7 +196,7 @@ public static MetadataManager createTestMetadataManager(CatalogManager catalogMa new SchemaPropertyManager(), new TablePropertyManager(), new ColumnPropertyManager(), - createTestTransactionManager(catalogManager)); + transactionManager); } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/operator/ExplainAnalyzeOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/ExplainAnalyzeOperator.java index a4049062c28f6..9035a0c1939ba 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/ExplainAnalyzeOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/ExplainAnalyzeOperator.java @@ -19,7 +19,9 @@ import com.facebook.presto.execution.QueryPerformanceFetcher; import com.facebook.presto.execution.StageId; import com.facebook.presto.execution.StageInfo; +import com.facebook.presto.execution.scheduler.NodeSchedulerConfig; import com.facebook.presto.metadata.FunctionRegistry; +import com.facebook.presto.metadata.InternalNodeManager; import com.facebook.presto.spi.Page; import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.sql.planner.plan.PlanNodeId; @@ -45,6 +47,8 @@ public static class ExplainAnalyzeOperatorFactory private final FunctionRegistry functionRegistry; private final StatsCalculator statsCalculator; private final CostCalculator costCalculator; + private final InternalNodeManager nodeManager; + private final NodeSchedulerConfig nodeSchedulerConfig; private final boolean verbose; private boolean closed; @@ -55,6 +59,8 @@ public ExplainAnalyzeOperatorFactory( FunctionRegistry functionRegistry, StatsCalculator statsCalculator, CostCalculator costCalculator, + InternalNodeManager nodeManager, + NodeSchedulerConfig nodeSchedulerConfig, boolean verbose) { this.operatorId = operatorId; @@ -63,6 +69,8 @@ public ExplainAnalyzeOperatorFactory( this.functionRegistry = requireNonNull(functionRegistry, "functionRegistry is null"); this.statsCalculator = requireNonNull(statsCalculator, "statsCalculator is null"); this.costCalculator = requireNonNull(costCalculator, "costCalculator is null"); + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); + this.nodeSchedulerConfig = requireNonNull(nodeSchedulerConfig, "nodeSchedulerConfig is null"); this.verbose = verbose; } @@ -71,7 +79,7 @@ public Operator createOperator(DriverContext driverContext) { checkState(!closed, "Factory is already closed"); OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, ExplainAnalyzeOperator.class.getSimpleName()); - return new ExplainAnalyzeOperator(operatorContext, queryPerformanceFetcher, functionRegistry, statsCalculator, costCalculator, verbose); + return new ExplainAnalyzeOperator(operatorContext, queryPerformanceFetcher, functionRegistry, statsCalculator, costCalculator, nodeManager, nodeSchedulerConfig, verbose); } @Override @@ -83,7 +91,7 @@ public void noMoreOperators() @Override public OperatorFactory duplicate() { - return new ExplainAnalyzeOperatorFactory(operatorId, planNodeId, queryPerformanceFetcher, functionRegistry, statsCalculator, costCalculator, verbose); + return new ExplainAnalyzeOperatorFactory(operatorId, planNodeId, queryPerformanceFetcher, functionRegistry, statsCalculator, costCalculator, nodeManager, nodeSchedulerConfig, verbose); } } @@ -92,6 +100,8 @@ public OperatorFactory duplicate() private final FunctionRegistry functionRegistry; private final StatsCalculator statsCalculator; private final CostCalculator costCalculator; + private final InternalNodeManager nodeManager; + private final NodeSchedulerConfig nodeSchedulerConfig; private final boolean verbose; private boolean finishing; private boolean outputConsumed; @@ -102,6 +112,8 @@ public ExplainAnalyzeOperator( FunctionRegistry functionRegistry, StatsCalculator statsCalculator, CostCalculator costCalculator, + InternalNodeManager nodeManager, + NodeSchedulerConfig nodeSchedulerConfig, boolean verbose) { this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); @@ -109,6 +121,8 @@ public ExplainAnalyzeOperator( this.functionRegistry = requireNonNull(functionRegistry, "functionRegistry is null"); this.statsCalculator = requireNonNull(statsCalculator, "statsCalculator is null"); this.costCalculator = requireNonNull(costCalculator, "costCalculator is null"); + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); + this.nodeSchedulerConfig = requireNonNull(nodeSchedulerConfig, "nodeSchedulerConfig is null"); this.verbose = verbose; } @@ -159,7 +173,7 @@ public Page getOutput() return null; } - String plan = textDistributedPlan(queryInfo.getOutputStage().get().getSubStages().get(0), functionRegistry, statsCalculator, costCalculator, operatorContext.getSession(), verbose); + String plan = textDistributedPlan(queryInfo.getOutputStage().get().getSubStages().get(0), functionRegistry, statsCalculator, costCalculator, nodeManager, nodeSchedulerConfig, operatorContext.getSession(), verbose); BlockBuilder builder = VARCHAR.createBlockBuilder(null, 1); VARCHAR.writeString(builder, plan); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/QueryExplainer.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/QueryExplainer.java index ff3987942dd31..e2e7c18d316c4 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/QueryExplainer.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/QueryExplainer.java @@ -17,6 +17,8 @@ import com.facebook.presto.cost.CostCalculator; import com.facebook.presto.cost.StatsCalculator; import com.facebook.presto.execution.DataDefinitionTask; +import com.facebook.presto.execution.scheduler.NodeSchedulerConfig; +import com.facebook.presto.metadata.InternalNodeManager; import com.facebook.presto.metadata.Metadata; import com.facebook.presto.security.AccessControl; import com.facebook.presto.spi.PrestoException; @@ -57,6 +59,8 @@ public class QueryExplainer private final SqlParser sqlParser; private final StatsCalculator statsCalculator; private final CostCalculator costCalculator; + private final InternalNodeManager nodeManager; + private final NodeSchedulerConfig nodeSchedulerConfig; private final Map, DataDefinitionTask> dataDefinitionTask; @Inject @@ -69,6 +73,8 @@ public QueryExplainer( SqlParser sqlParser, StatsCalculator statsCalculator, CostCalculator costCalculator, + InternalNodeManager nodeManager, + NodeSchedulerConfig nodeSchedulerConfig, Map, DataDefinitionTask> dataDefinitionTask) { this( @@ -80,6 +86,8 @@ public QueryExplainer( sqlParser, statsCalculator, costCalculator, + nodeManager, + nodeSchedulerConfig, dataDefinitionTask); } @@ -92,6 +100,8 @@ public QueryExplainer( SqlParser sqlParser, StatsCalculator statsCalculator, CostCalculator costCalculator, + InternalNodeManager nodeManager, + NodeSchedulerConfig nodeSchedulerConfig, Map, DataDefinitionTask> dataDefinitionTask) { this.planOptimizers = requireNonNull(planOptimizers, "planOptimizers is null"); @@ -102,6 +112,8 @@ public QueryExplainer( this.sqlParser = requireNonNull(sqlParser, "sqlParser is null"); this.statsCalculator = requireNonNull(statsCalculator, "statsCalculator is null"); this.costCalculator = requireNonNull(costCalculator, "costCalculator is null"); + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); + this.nodeSchedulerConfig = requireNonNull(nodeSchedulerConfig, "nodeSchedulerConfig is null"); this.dataDefinitionTask = ImmutableMap.copyOf(requireNonNull(dataDefinitionTask, "dataDefinitionTask is null")); } @@ -124,7 +136,7 @@ public String getPlan(Session session, Statement statement, Type planType, List< return PlanPrinter.textLogicalPlan(plan.getRoot(), plan.getTypes(), metadata.getFunctionRegistry(), statsCalculator, costCalculator, session, 0, false); case DISTRIBUTED: SubPlan subPlan = getDistributedPlan(session, statement, parameters); - return PlanPrinter.textDistributedPlan(subPlan, metadata.getFunctionRegistry(), statsCalculator, costCalculator, session, false); + return PlanPrinter.textDistributedPlan(subPlan, metadata.getFunctionRegistry(), statsCalculator, costCalculator, nodeManager, nodeSchedulerConfig, session, false); case IO: return IOPlanPrinter.textIOPlan(getLogicalPlan(session, statement, parameters).getRoot(), metadata, session); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java index edecd9eae2cb7..e23908e83f422 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java @@ -15,6 +15,7 @@ import com.facebook.presto.Session; import com.facebook.presto.SystemSessionProperties; +import com.facebook.presto.cost.CostCalculatorUsingExchanges; import com.facebook.presto.execution.ExplainAnalyzeContext; import com.facebook.presto.execution.StageId; import com.facebook.presto.execution.TaskManagerConfig; @@ -738,7 +739,7 @@ public PhysicalOperation visitExplainAnalyze(ExplainAnalyzeNode node, LocalExecu ExplainAnalyzeContext analyzeContext = explainAnalyzeContext .orElseThrow(() -> new IllegalStateException("ExplainAnalyze can only run on coordinator")); PhysicalOperation source = node.getSource().accept(this, context); - + verify(analyzeContext.getCostCalculator() instanceof CostCalculatorUsingExchanges, "costCalculator not instance of CostCalculatorUsingExchanges but plan is distributed"); OperatorFactory operatorFactory = new ExplainAnalyzeOperatorFactory( context.getNextOperatorId(), node.getId(), @@ -746,6 +747,8 @@ public PhysicalOperation visitExplainAnalyze(ExplainAnalyzeNode node, LocalExecu metadata.getFunctionRegistry(), analyzeContext.getStatsCalculator(), analyzeContext.getCostCalculator(), + analyzeContext.getNodeManager(), + analyzeContext.getNodeSchedulerConfig(), node.isVerbose()); return new PhysicalOperation(operatorFactory, makeLayout(node), context, source); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java index 1a86195f824e4..aa40df686b005 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java @@ -254,7 +254,7 @@ else if (exchange.getType() == ExchangeNode.Type.REPARTITION) { .map(PlanFragment::getId) .collect(toImmutableList()); - return new RemoteSourceNode(exchange.getId(), childrenIds, exchange.getOutputSymbols(), exchange.getOrderingScheme()); + return new RemoteSourceNode(exchange.getId(), childrenIds, exchange.getOutputSymbols(), exchange.getOrderingScheme(), exchange.getType()); } private SubPlan buildSubPlan(PlanNode node, FragmentProperties properties, RewriteContext context) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java index b1a6e48b2c080..5b97164d8d0d7 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java @@ -317,7 +317,8 @@ public PlanNode visitRemoteSource(RemoteSourceNode node, RewriteContext co node.getId(), node.getSourceFragmentIds(), canonicalizeAndDistinct(node.getOutputSymbols()), - node.getOrderingScheme().map(this::canonicalizeAndDistinct)); + node.getOrderingScheme().map(this::canonicalizeAndDistinct), + node.getExchangeType()); } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/RemoteSourceNode.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/RemoteSourceNode.java index 8fac6027e6e6c..498213d7ea48c 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/RemoteSourceNode.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/RemoteSourceNode.java @@ -34,13 +34,15 @@ public class RemoteSourceNode private final List sourceFragmentIds; private final List outputs; private final Optional orderingScheme; + private final ExchangeNode.Type exchangeType; // This is needed to "unfragment" to compute stats correctly. @JsonCreator public RemoteSourceNode( @JsonProperty("id") PlanNodeId id, @JsonProperty("sourceFragmentIds") List sourceFragmentIds, @JsonProperty("outputs") List outputs, - @JsonProperty("orderingScheme") Optional orderingScheme) + @JsonProperty("orderingScheme") Optional orderingScheme, + @JsonProperty("exchangeType") ExchangeNode.Type exchangeType) { super(id); @@ -49,11 +51,12 @@ public RemoteSourceNode( this.sourceFragmentIds = sourceFragmentIds; this.outputs = ImmutableList.copyOf(outputs); this.orderingScheme = requireNonNull(orderingScheme, "orderingScheme is null"); + this.exchangeType = requireNonNull(exchangeType, "exchangeType is null"); } - public RemoteSourceNode(PlanNodeId id, PlanFragmentId sourceFragmentId, List outputs, Optional orderingScheme) + public RemoteSourceNode(PlanNodeId id, PlanFragmentId sourceFragmentId, List outputs, Optional orderingScheme, ExchangeNode.Type exchangeType) { - this(id, ImmutableList.of(sourceFragmentId), outputs, orderingScheme); + this(id, ImmutableList.of(sourceFragmentId), outputs, orderingScheme, exchangeType); } @Override @@ -81,6 +84,12 @@ public Optional getOrderingScheme() return orderingScheme; } + @JsonProperty("exchangeType") + public ExchangeNode.Type getExchangeType() + { + return exchangeType; + } + @Override public R accept(PlanVisitor visitor, C context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java index c7dec2b302d62..c1b1b5a566042 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java @@ -19,13 +19,19 @@ import com.facebook.presto.cost.CachingStatsProvider; import com.facebook.presto.cost.CostCalculator; import com.facebook.presto.cost.CostProvider; +import com.facebook.presto.cost.FragmentedPlanCostCalculator; +import com.facebook.presto.cost.FragmentedPlanSourceProvider; +import com.facebook.presto.cost.FragmentedPlanStatsCalculator; import com.facebook.presto.cost.PlanNodeCostEstimate; +import com.facebook.presto.cost.PlanNodeSourceProvider; import com.facebook.presto.cost.PlanNodeStatsEstimate; import com.facebook.presto.cost.StatsCalculator; import com.facebook.presto.cost.StatsProvider; import com.facebook.presto.execution.StageInfo; import com.facebook.presto.execution.StageStats; +import com.facebook.presto.execution.scheduler.NodeSchedulerConfig; import com.facebook.presto.metadata.FunctionRegistry; +import com.facebook.presto.metadata.InternalNodeManager; import com.facebook.presto.metadata.OperatorNotFoundException; import com.facebook.presto.metadata.Signature; import com.facebook.presto.metadata.TableHandle; @@ -131,6 +137,7 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.Iterables.getOnlyElement; import static io.airlift.units.DataSize.succinctBytes; import static java.lang.Double.isFinite; @@ -186,39 +193,42 @@ public static String textLogicalPlan(PlanNode plan, TypeProvider types, Function public static String textLogicalPlan(PlanNode plan, TypeProvider types, FunctionRegistry functionRegistry, StatsCalculator statsCalculator, CostCalculator costCalculator, Session session, int indent, boolean verbose) { - return textLogicalPlan(plan, types, Optional.empty(), functionRegistry, statsCalculator, costCalculator, session, Optional.empty(), indent, verbose); + return textLogicalPlan(plan, types, Optional.empty(), functionRegistry, statsCalculator, costCalculator, PlanNode::getSources, session, Optional.empty(), indent, verbose); } - public static String textLogicalPlan(PlanNode plan, TypeProvider types, Optional stageExecutionStrategy, FunctionRegistry functionRegistry, StatsCalculator statsCalculator, CostCalculator costCalculator, Session session, Optional> stats, int indent, boolean verbose) + public static String textLogicalPlan(PlanNode plan, TypeProvider types, Optional stageExecutionStrategy, FunctionRegistry functionRegistry, StatsCalculator statsCalculator, CostCalculator costCalculator, PlanNodeSourceProvider sourceProvider, Session session, Optional> stats, int indent, boolean verbose) { - CachingStatsProvider statsProvider = new CachingStatsProvider(statsCalculator, session, types); - CachingCostProvider costProvider = new CachingCostProvider(costCalculator, statsProvider, session, types); + StatsProvider statsProvider = new CachingStatsProvider(statsCalculator, session, types); + CostProvider costProvider = new CachingCostProvider(costCalculator, statsProvider, session, types, sourceProvider); return new PlanPrinter(plan, types, stageExecutionStrategy, functionRegistry, statsProvider, costProvider, session, stats, indent, verbose).toString(); } - public static String textDistributedPlan(StageInfo outputStageInfo, FunctionRegistry functionRegistry, StatsCalculator statsCalculator, CostCalculator costCalculator, Session session, boolean verbose) + public static String textDistributedPlan(StageInfo outputStageInfo, FunctionRegistry functionRegistry, StatsCalculator statsCalculator, CostCalculator costCalculator, InternalNodeManager nodeManager, NodeSchedulerConfig nodeSchedulerConfig, Session session, boolean verbose) { StringBuilder builder = new StringBuilder(); List allStages = getAllStages(Optional.of(outputStageInfo)); + List allFragments = allStages.stream() + .map(StageInfo::getPlan) + .collect(toImmutableList()); for (StageInfo stageInfo : allStages) { Map aggregatedStats = aggregatePlanNodeStats(stageInfo); - builder.append(formatFragment(functionRegistry, statsCalculator, costCalculator, session, stageInfo.getPlan(), Optional.of(stageInfo), Optional.of(aggregatedStats), verbose)); + builder.append(formatFragment(functionRegistry, statsCalculator, costCalculator, nodeManager, nodeSchedulerConfig, session, stageInfo.getPlan(), Optional.of(stageInfo), Optional.of(aggregatedStats), verbose, allFragments)); } return builder.toString(); } - public static String textDistributedPlan(SubPlan plan, FunctionRegistry functionRegistry, StatsCalculator statsCalculator, CostCalculator costCalculator, Session session, boolean verbose) + public static String textDistributedPlan(SubPlan plan, FunctionRegistry functionRegistry, StatsCalculator statsCalculator, CostCalculator costCalculator, InternalNodeManager nodeManager, NodeSchedulerConfig nodeSchedulerConfig, Session session, boolean verbose) { StringBuilder builder = new StringBuilder(); for (PlanFragment fragment : plan.getAllFragments()) { - builder.append(formatFragment(functionRegistry, statsCalculator, costCalculator, session, fragment, Optional.empty(), Optional.empty(), verbose)); + builder.append(formatFragment(functionRegistry, statsCalculator, costCalculator, nodeManager, nodeSchedulerConfig, session, fragment, Optional.empty(), Optional.empty(), verbose, plan.getAllFragments())); } return builder.toString(); } - private static String formatFragment(FunctionRegistry functionRegistry, StatsCalculator statsCalculator, CostCalculator costCalculator, Session session, PlanFragment fragment, Optional stageInfo, Optional> planNodeStats, boolean verbose) + private static String formatFragment(FunctionRegistry functionRegistry, StatsCalculator statsCalculator, CostCalculator costCalculator, InternalNodeManager nodeManager, NodeSchedulerConfig nodeSchedulerConfig, Session session, PlanFragment fragment, Optional stageInfo, Optional> planNodeStats, boolean verbose, List allFragments) { StringBuilder builder = new StringBuilder(); builder.append(format("Fragment %s [%s]\n", @@ -274,14 +284,15 @@ private static String formatFragment(FunctionRegistry functionRegistry, StatsCal } builder.append(indentString(1)).append(format("Grouped Execution: %s\n", fragment.getStageExecutionStrategy().isAnyScanGroupedExecution())); - if (stageInfo.isPresent()) { - builder.append(textLogicalPlan(fragment.getRoot(), TypeProvider.copyOf(fragment.getSymbols()), Optional.of(fragment.getStageExecutionStrategy()), functionRegistry, statsCalculator, costCalculator, session, Optional.of(planNodeStats.get()), 1, verbose)) - .append("\n"); - } - else { - builder.append(textLogicalPlan(fragment.getRoot(), TypeProvider.copyOf(fragment.getSymbols()), Optional.of(fragment.getStageExecutionStrategy()), functionRegistry, statsCalculator, costCalculator, session, Optional.empty(), 1, verbose)) - .append("\n"); - } + TypeProvider typeProvider = TypeProvider.copyOf(allFragments.stream() + .flatMap(f -> f.getSymbols().entrySet().stream()) + .distinct() + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue))); + FragmentedPlanSourceProvider sourceProvider = FragmentedPlanSourceProvider.create(allFragments); + statsCalculator = new FragmentedPlanStatsCalculator(statsCalculator, sourceProvider); + costCalculator = new FragmentedPlanCostCalculator(sourceProvider, costCalculator, nodeManager, nodeSchedulerConfig); + builder.append(textLogicalPlan(fragment.getRoot(), typeProvider, Optional.of(fragment.getStageExecutionStrategy()), functionRegistry, statsCalculator, costCalculator, sourceProvider, session, planNodeStats, 1, verbose)) + .append("\n"); return builder.toString(); } diff --git a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java index ed1af38e4d6e5..49ccf3c8a067d 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java @@ -242,6 +242,7 @@ public class LocalQueryRunner private final boolean alwaysRevokeMemory; private final NodeSpillConfig nodeSpillConfig; + private final NodeSchedulerConfig nodeSchedulerConfig; private boolean printPlan; private final ReadWriteLock lock = new ReentrantReadWriteLock(); @@ -279,10 +280,11 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig, this.typeRegistry = new TypeRegistry(); this.pageSorter = new PagesIndexPageSorter(new PagesIndex.TestingFactory(false)); this.indexManager = new IndexManager(); + this.nodeSchedulerConfig = new NodeSchedulerConfig().setIncludeCoordinator(true); NodeScheduler nodeScheduler = new NodeScheduler( new LegacyNetworkTopology(), nodeManager, - new NodeSchedulerConfig().setIncludeCoordinator(true), + nodeSchedulerConfig, new NodeTaskMap(finalizerService)); this.pageSinkManager = new PageSinkManager(); CatalogManager catalogManager = new CatalogManager(); @@ -827,6 +829,8 @@ public Plan createPlan(Session session, @Language("SQL") String sql, List stats) + { + sourcesStats.putAll(stats); + return this; + } + public StatsCalculatorAssertion check(Consumer statisticsAssertionConsumer) { PlanNodeStatsEstimate statsEstimate = statsCalculator.calculateStats(planNode, this::getSourceStats, noLookup(), session, types); diff --git a/presto-main/src/test/java/com/facebook/presto/cost/TestCostCalculator.java b/presto-main/src/test/java/com/facebook/presto/cost/TestCostCalculator.java index 1145187fc9e26..917392fa81f0f 100644 --- a/presto-main/src/test/java/com/facebook/presto/cost/TestCostCalculator.java +++ b/presto-main/src/test/java/com/facebook/presto/cost/TestCostCalculator.java @@ -15,16 +15,32 @@ import com.facebook.presto.Session; import com.facebook.presto.connector.ConnectorId; +import com.facebook.presto.execution.NodeTaskMap; +import com.facebook.presto.execution.QueryManagerConfig; +import com.facebook.presto.execution.scheduler.LegacyNetworkTopology; +import com.facebook.presto.execution.scheduler.NodeScheduler; +import com.facebook.presto.execution.scheduler.NodeSchedulerConfig; +import com.facebook.presto.metadata.CatalogManager; +import com.facebook.presto.metadata.InMemoryNodeManager; +import com.facebook.presto.metadata.MetadataManager; import com.facebook.presto.metadata.Signature; import com.facebook.presto.metadata.TableHandle; +import com.facebook.presto.metadata.TableLayoutHandle; +import com.facebook.presto.security.AllowAllAccessControl; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.predicate.TupleDomain; import com.facebook.presto.spi.type.StandardTypes; import com.facebook.presto.spi.type.Type; +import com.facebook.presto.sql.analyzer.FeaturesConfig; +import com.facebook.presto.sql.planner.NodePartitioningManager; +import com.facebook.presto.sql.planner.Plan; +import com.facebook.presto.sql.planner.PlanFragmenter; +import com.facebook.presto.sql.planner.SubPlan; import com.facebook.presto.sql.planner.Symbol; import com.facebook.presto.sql.planner.TypeProvider; import com.facebook.presto.sql.planner.plan.AggregationNode; import com.facebook.presto.sql.planner.plan.Assignments; +import com.facebook.presto.sql.planner.plan.ExchangeNode; import com.facebook.presto.sql.planner.plan.JoinNode; import com.facebook.presto.sql.planner.plan.PlanNode; import com.facebook.presto.sql.planner.plan.PlanNodeId; @@ -37,9 +53,14 @@ import com.facebook.presto.sql.tree.SymbolReference; import com.facebook.presto.tpch.TpchColumnHandle; import com.facebook.presto.tpch.TpchTableHandle; +import com.facebook.presto.tpch.TpchTableLayoutHandle; +import com.facebook.presto.transaction.TransactionManager; +import com.facebook.presto.util.FinalizerService; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.util.Arrays; @@ -54,14 +75,24 @@ import static com.facebook.presto.cost.PlanNodeCostEstimate.cpuCost; import static com.facebook.presto.cost.PlanNodeStatsEstimate.UNKNOWN_STATS; import static com.facebook.presto.metadata.FunctionKind.AGGREGATE; +import static com.facebook.presto.metadata.MetadataManager.createTestMetadataManager; import static com.facebook.presto.spi.type.BigintType.BIGINT; import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature; import static com.facebook.presto.spi.type.VarcharType.VARCHAR; import static com.facebook.presto.sql.planner.iterative.Lookup.noLookup; import static com.facebook.presto.sql.planner.plan.AggregationNode.singleGroupingSet; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.partitionedExchange; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.replicatedExchange; +import static com.facebook.presto.testing.TestingSession.createBogusTestingCatalog; import static com.facebook.presto.testing.TestingSession.testSessionBuilder; +import static com.facebook.presto.tpch.TpchTransactionHandle.INSTANCE; +import static com.facebook.presto.transaction.InMemoryTransactionManager.createTestTransactionManager; +import static com.facebook.presto.transaction.TransactionBuilder.transaction; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -72,9 +103,55 @@ public class TestCostCalculator private static final double AVERAGE_ROW_SIZE = 8.; private static final double IS_NULL_OVERHEAD = 9. / AVERAGE_ROW_SIZE; private static final double OFFSET_AND_IS_NULL_OVERHEAD = 13. / AVERAGE_ROW_SIZE; - private final CostCalculator costCalculatorUsingExchanges = new CostCalculatorUsingExchanges(() -> NUMBER_OF_NODES); - private final CostCalculator costCalculatorWithEstimatedExchanges = new CostCalculatorWithEstimatedExchanges(costCalculatorUsingExchanges, () -> NUMBER_OF_NODES); - private Session session = testSessionBuilder().build(); + private CostCalculator costCalculatorUsingExchanges; + private CostCalculator costCalculatorWithEstimatedExchanges; + private PlanFragmenter planFragmenter; + private Session session; + private MetadataManager metadata; + private TransactionManager transactionManager; + private FinalizerService finalizerService; + private NodeScheduler nodeScheduler; + private NodePartitioningManager nodePartitioningManager; + + @BeforeClass + public void setUp() + { + costCalculatorUsingExchanges = new CostCalculatorUsingExchanges(() -> NUMBER_OF_NODES); + costCalculatorWithEstimatedExchanges = new CostCalculatorWithEstimatedExchanges(costCalculatorUsingExchanges, () -> NUMBER_OF_NODES); + planFragmenter = new PlanFragmenter(new QueryManagerConfig()); + + session = testSessionBuilder().setCatalog("tpch").build(); + + CatalogManager catalogManager = new CatalogManager(); + catalogManager.registerCatalog(createBogusTestingCatalog("tpch")); + transactionManager = createTestTransactionManager(catalogManager); + metadata = createTestMetadataManager(transactionManager, new FeaturesConfig()); + + finalizerService = new FinalizerService(); + finalizerService.start(); + nodeScheduler = new NodeScheduler( + new LegacyNetworkTopology(), + new InMemoryNodeManager(), + new NodeSchedulerConfig().setIncludeCoordinator(true), + new NodeTaskMap(finalizerService)); + nodePartitioningManager = new NodePartitioningManager(nodeScheduler); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + { + costCalculatorUsingExchanges = null; + costCalculatorWithEstimatedExchanges = null; + planFragmenter = null; + session = null; + transactionManager = null; + metadata = null; + finalizerService.destroy(); + finalizerService = null; + nodeScheduler.stop(); + nodeScheduler = null; + nodePartitioningManager = null; + } @Test public void testTableScan() @@ -92,6 +169,11 @@ public void testTableScan() .memory(0) .network(0); + assertCostFragmentedPlan(tableScan, ImmutableMap.of(), ImmutableMap.of("ts", statsEstimate(tableScan, 1000)), types) + .cpu(1000 * IS_NULL_OVERHEAD) + .memory(0) + .network(0); + assertCostHasUnknownComponentsForUnknownStats(tableScan, types); } @@ -118,6 +200,11 @@ public void testProject() .memory(0) .network(0); + assertCostFragmentedPlan(project, costs, stats, types) + .cpu(1000 + 4000 * OFFSET_AND_IS_NULL_OVERHEAD) + .memory(0) + .network(0); + assertCostHasUnknownComponentsForUnknownStats(project, types); } @@ -155,6 +242,11 @@ public void testRepartitionedJoin() .memory(1000 * IS_NULL_OVERHEAD) .network((6000 + 1000) * IS_NULL_OVERHEAD); + assertCostFragmentedPlan(join, costs, stats, types) + .cpu(6000 + 1000 + (12000 + 6000 + 1000) * IS_NULL_OVERHEAD) + .memory(1000 * IS_NULL_OVERHEAD) + .network(0); + assertCostHasUnknownComponentsForUnknownStats(join, types); } @@ -193,6 +285,11 @@ public void testReplicatedJoin() .memory(1000 * NUMBER_OF_NODES * IS_NULL_OVERHEAD) .network(1000 * NUMBER_OF_NODES * IS_NULL_OVERHEAD); + assertCostFragmentedPlan(join, costs, stats, types) + .cpu(1000 + 6000 + (12000 + 6000 + 10000 + 1000 * (NUMBER_OF_NODES - 1)) * IS_NULL_OVERHEAD) + .memory(1000 * NUMBER_OF_NODES * IS_NULL_OVERHEAD) + .network(0); + assertCostHasUnknownComponentsForUnknownStats(join, types); } @@ -220,9 +317,74 @@ public void testAggregation() .memory(13 * IS_NULL_OVERHEAD) .network(6000 * IS_NULL_OVERHEAD); + assertCostFragmentedPlan(aggregation, costs, stats, types) + .cpu((6000) * IS_NULL_OVERHEAD + 6000) + .memory(13 * IS_NULL_OVERHEAD) + .network(0 * IS_NULL_OVERHEAD); + assertCostHasUnknownComponentsForUnknownStats(aggregation, types); } + @Test + public void testRepartitionedJoinWithExchange() + { + TableScanNode ts1 = tableScan("ts1", "orderkey"); + TableScanNode ts2 = tableScan("ts2", "orderkey_0"); + ExchangeNode remoteExchange1 = partitionedExchange(new PlanNodeId("re1"), REMOTE, ts1, ImmutableList.of(new Symbol("orderkey")), Optional.empty()); + ExchangeNode remoteExchange2 = partitionedExchange(new PlanNodeId("re2"), REMOTE, ts2, ImmutableList.of(new Symbol("orderkey_0")), Optional.empty()); + ExchangeNode localExchange = partitionedExchange(new PlanNodeId("le"), LOCAL, remoteExchange2, ImmutableList.of(new Symbol("orderkey_0")), Optional.empty()); + + JoinNode join = join("join", + remoteExchange1, + localExchange, + JoinNode.DistributionType.PARTITIONED, + "orderkey", + "orderkey_0"); + + Map stats = ImmutableMap.builder() + .put("join", statsEstimate(join, 12000)) + .put("re1", statsEstimate(remoteExchange1, 10000)) + .put("re2", statsEstimate(remoteExchange2, 10000)) + .put("le", statsEstimate(localExchange, 6000)) + .put("ts1", statsEstimate(ts1, 6000)) + .put("ts2", statsEstimate(ts2, 1000)) + .build(); + Map types = ImmutableMap.of( + "orderkey", BIGINT, + "orderkey_0", BIGINT); + + assertFragmentedEqualsUnfragmented(join, stats, types); + } + + @Test + public void testReplicatedJoinWithExchange() + { + TableScanNode ts1 = tableScan("ts1", "orderkey"); + TableScanNode ts2 = tableScan("ts2", "orderkey_0"); + ExchangeNode remoteExchange2 = replicatedExchange(new PlanNodeId("re2"), REMOTE, ts2); + ExchangeNode localExchange = partitionedExchange(new PlanNodeId("le"), LOCAL, remoteExchange2, ImmutableList.of(new Symbol("orderkey_0")), Optional.empty()); + + JoinNode join = join("join", + ts1, + localExchange, + JoinNode.DistributionType.REPLICATED, + "orderkey", + "orderkey_0"); + + Map stats = ImmutableMap.builder() + .put("join", statsEstimate(join, 12000)) + .put("re2", statsEstimate(remoteExchange2, 10000)) + .put("le", statsEstimate(localExchange, 6000)) + .put("ts1", statsEstimate(ts1, 6000)) + .put("ts2", statsEstimate(ts2, 1000)) + .build(); + Map types = ImmutableMap.of( + "orderkey", BIGINT, + "orderkey_0", BIGINT); + + assertFragmentedEqualsUnfragmented(join, stats, types); + } + private CostAssertionBuilder assertCost( PlanNode node, Map costs, @@ -241,6 +403,20 @@ private CostAssertionBuilder assertCostEstimatedExchanges( return assertCost(costCalculatorWithEstimatedExchanges, node, costs, stats, types); } + private CostAssertionBuilder assertCostFragmentedPlan( + PlanNode node, + Map costs, + Map stats, + Map types) + { + TypeProvider typeProvider = TypeProvider.copyOf(types.entrySet().stream() + .collect(ImmutableMap.toImmutableMap(entry -> new Symbol(entry.getKey()), Map.Entry::getValue))); + SubPlan subPlan = fragment(new Plan(node, typeProvider)); + FragmentedPlanSourceProvider sourceProvider = FragmentedPlanSourceProvider.create(subPlan.getAllFragments()); + FragmentedPlanCostCalculator costCalculator = new FragmentedPlanCostCalculator(costCalculatorUsingExchanges, sourceProvider, () -> NUMBER_OF_NODES); + return assertCost(costCalculator, node, costs, stats, types); + } + private CostAssertionBuilder assertCost( CostCalculator costCalculator, PlanNode node, @@ -275,6 +451,20 @@ private void assertCostHasUnknownComponentsForUnknownStats(PlanNode node, Map stats, Map types) + { + StatsCalculator statsCalculator = statsCalculator(stats); + PlanNodeCostEstimate costWithExchanges = calculateCumulativeCost(node, costCalculatorUsingExchanges, statsCalculator, types); + PlanNodeCostEstimate costWithFragments = calculateCumulativeCostFragmentedPlan(node, statsCalculator, types); + assertEquals(costWithExchanges, costWithFragments); + } + + private StatsCalculator statsCalculator(Map stats) + { + return (node, sourceStats, lookup, session, types) -> + requireNonNull(stats.get(node.getId().toString()), "no stats for node"); + } + private PlanNodeCostEstimate calculateCumulativeCost( CostCalculator costCalculator, PlanNode node, @@ -291,11 +481,31 @@ private PlanNodeCostEstimate calculateCumulativeCost( .collect(ImmutableMap.toImmutableMap(entry -> new Symbol(entry.getKey()), Map.Entry::getValue)))); PlanNodeCostEstimate sourcesCost = node.getSources().stream() - .map(source -> requireNonNull(costs.apply(source), "no cost for source")) + .map(source -> requireNonNull(costs.apply(source), format("no cost for source: %s", source.getId()))) .reduce(ZERO_COST, PlanNodeCostEstimate::add); return sourcesCost.add(localCost); } + private PlanNodeCostEstimate calculateCumulativeCost(PlanNode node, CostCalculator costCalculator, StatsCalculator statsCalculator, Map types) + { + TypeProvider typeProvider = TypeProvider.copyOf(types.entrySet().stream() + .collect(ImmutableMap.toImmutableMap(entry -> new Symbol(entry.getKey()), Map.Entry::getValue))); + StatsProvider statsProvider = new CachingStatsProvider(statsCalculator, session, typeProvider); + CostProvider costProvider = new CachingCostProvider(costCalculator, statsProvider, Optional.empty(), noLookup(), session, typeProvider); + return costProvider.getCumulativeCost(node); + } + + private PlanNodeCostEstimate calculateCumulativeCostFragmentedPlan(PlanNode node, StatsCalculator statsCalculator, Map types) + { + TypeProvider typeProvider = TypeProvider.copyOf(types.entrySet().stream() + .collect(ImmutableMap.toImmutableMap(entry -> new Symbol(entry.getKey()), Map.Entry::getValue))); + SubPlan subPlan = fragment(new Plan(node, typeProvider)); + FragmentedPlanSourceProvider sourceProvider = FragmentedPlanSourceProvider.create(subPlan.getAllFragments()); + statsCalculator = new FragmentedPlanStatsCalculator(statsCalculator, sourceProvider); + CostCalculator costCalculator = new FragmentedPlanCostCalculator(costCalculatorUsingExchanges, sourceProvider, () -> NUMBER_OF_NODES); + return calculateCumulativeCost(node, costCalculator, statsCalculator, types); + } + private static class CostAssertionBuilder { private final PlanNodeCostEstimate actual; @@ -364,12 +574,13 @@ private TableScanNode tableScan(String id, String... symbols) assignments.put(symbol, new TpchColumnHandle("orderkey", BIGINT)); } + TpchTableHandle tableHandle = new TpchTableHandle("local", "orders", 1.0); return new TableScanNode( new PlanNodeId(id), new TableHandle(new ConnectorId("tpch"), new TpchTableHandle("local", "orders", 1.0)), symbolsList, assignments.build(), - Optional.empty(), + Optional.of(new TableLayoutHandle(new ConnectorId("tpch"), INSTANCE, new TpchTableLayoutHandle(tableHandle, TupleDomain.all()))), TupleDomain.all()); } @@ -427,4 +638,20 @@ private JoinNode join(String planNodeId, PlanNode left, PlanNode right, JoinNode Optional.empty(), Optional.of(distributionType)); } + + private SubPlan fragment(Plan plan) + { + return inTransaction(session -> planFragmenter.createSubPlans(session, metadata, nodePartitioningManager, plan, false)); + } + + private T inTransaction(Function transactionSessionConsumer) + { + return transaction(transactionManager, new AllowAllAccessControl()) + .singleStatement() + .execute(session, session -> { + // metadata.getCatalogHandle() registers the catalog for the transaction + session.getCatalog().ifPresent(catalog -> metadata.getCatalogHandle(session, catalog)); + return transactionSessionConsumer.apply(session); + }); + } } diff --git a/presto-main/src/test/java/com/facebook/presto/cost/TestFragmentedPlanStatsCalculator.java b/presto-main/src/test/java/com/facebook/presto/cost/TestFragmentedPlanStatsCalculator.java new file mode 100644 index 0000000000000..2d5a3717e5d66 --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/cost/TestFragmentedPlanStatsCalculator.java @@ -0,0 +1,209 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.cost; + +import com.facebook.presto.Session; +import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.spi.type.Type; +import com.facebook.presto.sql.planner.Partitioning; +import com.facebook.presto.sql.planner.PartitioningHandle; +import com.facebook.presto.sql.planner.PartitioningScheme; +import com.facebook.presto.sql.planner.PlanFragment; +import com.facebook.presto.sql.planner.PlanNodeIdAllocator; +import com.facebook.presto.sql.planner.Symbol; +import com.facebook.presto.sql.planner.TypeProvider; +import com.facebook.presto.sql.planner.iterative.rule.test.PlanBuilder; +import com.facebook.presto.sql.planner.plan.PlanFragmentId; +import com.facebook.presto.sql.planner.plan.PlanNode; +import com.facebook.presto.tpch.TpchPartitioningHandle; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static com.facebook.presto.metadata.MetadataManager.createTestMetadataManager; +import static com.facebook.presto.operator.StageExecutionStrategy.ungroupedExecution; +import static com.facebook.presto.spi.type.BigintType.BIGINT; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION; +import static com.facebook.presto.testing.TestingSession.testSessionBuilder; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Collections.emptyList; +import static java.util.Objects.requireNonNull; + +public class TestFragmentedPlanStatsCalculator +{ + private Metadata metadata; + private Session session; + + @BeforeClass + public void setUp() + { + this.metadata = createTestMetadataManager(); + this.session = testSessionBuilder().build(); + } + + @AfterClass + public void tearDown() + { + this.metadata = null; + this.session = null; + } + + @Test + public void testRemoteSourceNodeStats() + { + PlanBuilder pb = new PlanBuilder(new PlanNodeIdAllocator(), metadata); + + PlanNode values1 = pb.values(pb.symbol("i11", BIGINT), pb.symbol("i12", BIGINT), pb.symbol("i13", BIGINT), pb.symbol("i14", BIGINT)); + PlanFragment fragment1 = planFragment("f1", values1, pb.getTypes().allTypes(), values1.getOutputSymbols()); + PlanNodeStatsEstimate values1Stats = PlanNodeStatsEstimate.builder() + .setOutputRowCount(10) + .addSymbolStatistics(new Symbol("i11"), SymbolStatsEstimate.builder() + .setLowValue(1) + .setHighValue(10) + .setDistinctValuesCount(5) + .setNullsFraction(0.3) + .build()) + .addSymbolStatistics(new Symbol("i12"), SymbolStatsEstimate.builder() + .setLowValue(0) + .setHighValue(3) + .setDistinctValuesCount(4) + .setNullsFraction(0) + .build()) + .addSymbolStatistics(new Symbol("i13"), SymbolStatsEstimate.builder() + .setLowValue(10) + .setHighValue(15) + .setDistinctValuesCount(4) + .setNullsFraction(0.1) + .build()) + .addSymbolStatistics(new Symbol("i14"), SymbolStatsEstimate.builder() + .setLowValue(10) + .setHighValue(15) + .setDistinctValuesCount(4) + .setNullsFraction(0.1) + .build()) + .build(); + + PlanNode values2 = pb.values(pb.symbol("i21", BIGINT), pb.symbol("i22", BIGINT), pb.symbol("i23", BIGINT), pb.symbol("i24", BIGINT)); + PlanFragment fragment2 = planFragment("f2", values2, pb.getTypes().allTypes(), values2.getOutputSymbols()); + PlanNodeStatsEstimate values2Stats = PlanNodeStatsEstimate.builder() + .setOutputRowCount(20) + .addSymbolStatistics(new Symbol("i21"), SymbolStatsEstimate.builder() + .setLowValue(11) + .setHighValue(20) + .setNullsFraction(0.4) + .build()) + .addSymbolStatistics(new Symbol("i22"), SymbolStatsEstimate.builder() + .setLowValue(2) + .setHighValue(7) + .setDistinctValuesCount(3) + .build()) + .addSymbolStatistics(new Symbol("i23"), SymbolStatsEstimate.builder() + .setDistinctValuesCount(6) + .setNullsFraction(0.2) + .build()) + .addSymbolStatistics(new Symbol("i24"), SymbolStatsEstimate.builder() + .setLowValue(10) + .setHighValue(15) + .setDistinctValuesCount(4) + .setNullsFraction(0.1) + .build()) + .build(); + + Map stats = ImmutableMap.builder() + .put(values1, values1Stats) + .put(values2, values2Stats) + .build(); + + List fragments = ImmutableList.of(fragment1, fragment2); + ImmutableList fragmentIds = fragments.stream().map(PlanFragment::getId).collect(toImmutableList()); + PlanNode remoteSourceNode = pb.remoteSourceNode(fragmentIds, + ImmutableList.of(pb.symbol("o1", BIGINT), + pb.symbol("o2", BIGINT), + pb.symbol("o3", BIGINT), + pb.symbol("o4", BIGINT)), + REPARTITION); + + assertStatsFor(remoteSourceNode, stats, fragments, pb.getTypes()) + .check(check -> check + .outputRowsCount(30) + .symbolStats("o1", assertion -> assertion + .lowValue(1) + .highValue(20) + .distinctValuesCount(5) + .nullsFraction(0.3666666)) + .symbolStats("o2", assertion -> assertion + .lowValue(0) + .highValue(7) + .distinctValuesCount(3) + .nullsFractionUnknown()) + .symbolStats("o3", assertion -> assertion + .lowValueUnknown() + .highValueUnknown() + .distinctValuesCount(4.0) + .nullsFraction(0.1666667)) + .symbolStats("o4", assertion -> assertion + .lowValue(10) + .highValue(15) + .distinctValuesCount(4) + .nullsFraction(0.1))); + } + + private PlanFragment planFragment(String id, PlanNode root, Map types, List outputLayout) + { + PartitioningHandle partitioningHandle = new PartitioningHandle(Optional.empty(), Optional.empty(), new TpchPartitioningHandle("dummy", 100)); + return new PlanFragment(new PlanFragmentId(id), + root, + types, + partitioningHandle, + emptyList(), + new PartitioningScheme(Partitioning.create(partitioningHandle, emptyList()), outputLayout), + ungroupedExecution()); + } + + private StatsCalculatorAssertion assertStatsFor(PlanNode node, Map stats, List fragments, TypeProvider types) + { + StatsCalculator delegate = statsCalculator(stats); + FragmentedPlanSourceProvider sourceProvider = FragmentedPlanSourceProvider.create(fragments); + FragmentedPlanStatsCalculator statsCalculator = new FragmentedPlanStatsCalculator(delegate, sourceProvider); + + return new StatsCalculatorAssertion(statsCalculator, session, node, types) + .withSourceStats(stats); + } + + private StatsCalculator statsCalculator(Map stats) + { + return (node, sourceStats, lookup, session, types) -> + requireNonNull(stats.get(node), "no stats for node"); + } +} diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TaskTestUtils.java b/presto-main/src/test/java/com/facebook/presto/execution/TaskTestUtils.java index b0d421987de61..8c9ee00b447db 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TaskTestUtils.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TaskTestUtils.java @@ -19,6 +19,7 @@ import com.facebook.presto.block.BlockEncodingManager; import com.facebook.presto.client.NodeVersion; import com.facebook.presto.connector.ConnectorId; +import com.facebook.presto.cost.ComposableStatsCalculator; import com.facebook.presto.event.query.QueryMonitor; import com.facebook.presto.event.query.QueryMonitorConfig; import com.facebook.presto.eventlistener.EventListenerManager; @@ -73,6 +74,7 @@ import java.util.OptionalInt; import static com.facebook.presto.SessionTestUtils.TEST_SESSION; +import static com.facebook.presto.cost.PlanNodeCostEstimate.UNKNOWN_COST; import static com.facebook.presto.spi.type.BigintType.BIGINT; import static com.facebook.presto.spi.type.VarcharType.VARCHAR; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; @@ -177,6 +179,10 @@ public static QueryMonitor createTestQueryMonitor() new NodeVersion("testVersion"), new SessionPropertyManager(), metadata, - new QueryMonitorConfig()); + new QueryMonitorConfig(), + new ComposableStatsCalculator(ImmutableList.of()), + (node, stats, lookup, session, types) -> UNKNOWN_COST, + new InMemoryNodeManager(), + new NodeSchedulerConfig().setIncludeCoordinator(true)); } } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestPhasedExecutionSchedule.java b/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestPhasedExecutionSchedule.java index 8c29ef775f415..27d697592ec0b 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestPhasedExecutionSchedule.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestPhasedExecutionSchedule.java @@ -45,6 +45,8 @@ import static com.facebook.presto.spi.type.VarcharType.VARCHAR; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPLICATE; import static com.facebook.presto.sql.planner.plan.JoinNode.DistributionType.PARTITIONED; import static com.facebook.presto.sql.planner.plan.JoinNode.DistributionType.REPLICATED; import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER; @@ -156,7 +158,8 @@ private static PlanFragment createExchangePlanFragment(String name, PlanFragment .map(PlanFragment::getId) .collect(toImmutableList()), fragments[0].getPartitioningScheme().getOutputLayout(), - Optional.empty()); + Optional.empty(), + REPARTITION); return createFragment(planNode); } @@ -166,7 +169,7 @@ private static PlanFragment createUnionPlanFragment(String name, PlanFragment... PlanNode planNode = new UnionNode( new PlanNodeId(name + "_id"), Stream.of(fragments) - .map(fragment -> new RemoteSourceNode(new PlanNodeId(fragment.getId().toString()), fragment.getId(), fragment.getPartitioningScheme().getOutputLayout(), Optional.empty())) + .map(fragment -> new RemoteSourceNode(new PlanNodeId(fragment.getId().toString()), fragment.getId(), fragment.getPartitioningScheme().getOutputLayout(), Optional.empty(), REPARTITION)) .collect(toImmutableList()), ImmutableListMultimap.of(), ImmutableList.of()); @@ -185,7 +188,7 @@ private static PlanFragment createBroadcastJoinPlanFragment(String name, PlanFra Optional.empty(), TupleDomain.all()); - RemoteSourceNode remote = new RemoteSourceNode(new PlanNodeId("build_id"), buildFragment.getId(), ImmutableList.of(), Optional.empty()); + RemoteSourceNode remote = new RemoteSourceNode(new PlanNodeId("build_id"), buildFragment.getId(), ImmutableList.of(), Optional.empty(), REPLICATE); PlanNode join = new JoinNode( new PlanNodeId(name + "_id"), INNER, @@ -206,8 +209,8 @@ private static PlanFragment createBroadcastJoinPlanFragment(String name, PlanFra private static PlanFragment createJoinPlanFragment(JoinNode.Type joinType, String name, PlanFragment buildFragment, PlanFragment probeFragment) { - RemoteSourceNode probe = new RemoteSourceNode(new PlanNodeId("probe_id"), probeFragment.getId(), ImmutableList.of(), Optional.empty()); - RemoteSourceNode build = new RemoteSourceNode(new PlanNodeId("build_id"), buildFragment.getId(), ImmutableList.of(), Optional.empty()); + RemoteSourceNode probe = new RemoteSourceNode(new PlanNodeId("probe_id"), probeFragment.getId(), ImmutableList.of(), Optional.empty(), REPARTITION); + RemoteSourceNode build = new RemoteSourceNode(new PlanNodeId("build_id"), buildFragment.getId(), ImmutableList.of(), Optional.empty(), REPARTITION); PlanNode planNode = new JoinNode( new PlanNodeId(name + "_id"), joinType, diff --git a/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestSourcePartitionedScheduler.java b/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestSourcePartitionedScheduler.java index 45d02c5b1403c..779b1e594956b 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestSourcePartitionedScheduler.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestSourcePartitionedScheduler.java @@ -81,6 +81,7 @@ import static com.facebook.presto.spi.type.VarcharType.VARCHAR; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.GATHER; import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER; import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.concurrent.Threads.daemonThreadsNamed; @@ -435,7 +436,7 @@ private static StageExecutionPlan createPlan(ConnectorSplitSource splitSource) Optional.empty(), TupleDomain.all()); - RemoteSourceNode remote = new RemoteSourceNode(new PlanNodeId("remote_id"), new PlanFragmentId("plan_fragment_id"), ImmutableList.of(), Optional.empty()); + RemoteSourceNode remote = new RemoteSourceNode(new PlanNodeId("remote_id"), new PlanFragmentId("plan_fragment_id"), ImmutableList.of(), Optional.empty(), GATHER); PlanFragment testFragment = new PlanFragment( new PlanFragmentId("plan_id"), new JoinNode(new PlanNodeId("join_id"), diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java index f92016afb360b..47bf96a0cff8f 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java @@ -53,9 +53,11 @@ import com.facebook.presto.sql.planner.plan.LimitNode; import com.facebook.presto.sql.planner.plan.MarkDistinctNode; import com.facebook.presto.sql.planner.plan.OutputNode; +import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.sql.planner.plan.PlanNode; import com.facebook.presto.sql.planner.plan.PlanNodeId; import com.facebook.presto.sql.planner.plan.ProjectNode; +import com.facebook.presto.sql.planner.plan.RemoteSourceNode; import com.facebook.presto.sql.planner.plan.SampleNode; import com.facebook.presto.sql.planner.plan.SemiJoinNode; import com.facebook.presto.sql.planner.plan.TableFinishNode; @@ -702,6 +704,11 @@ public WindowNode window(WindowNode.Specification specification, Map fragmentIds, List symbols, ExchangeNode.Type exchangeType) + { + return new RemoteSourceNode(idAllocator.getNextId(), fragmentIds, symbols, Optional.empty(), exchangeType); + } + public static Expression expression(String sql) { return ExpressionUtils.rewriteIdentifiersToSymbolReferences(new SqlParser().createExpression(sql)); diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueryFramework.java b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueryFramework.java index 8cb8f495f58e5..31c96e86e7c36 100644 --- a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueryFramework.java +++ b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueryFramework.java @@ -19,6 +19,8 @@ import com.facebook.presto.cost.CostCalculatorWithEstimatedExchanges; import com.facebook.presto.cost.CostComparator; import com.facebook.presto.execution.QueryManagerConfig; +import com.facebook.presto.execution.scheduler.NodeSchedulerConfig; +import com.facebook.presto.metadata.InMemoryNodeManager; import com.facebook.presto.metadata.Metadata; import com.facebook.presto.spi.security.AccessDeniedException; import com.facebook.presto.spi.type.Type; @@ -333,6 +335,8 @@ private QueryExplainer getQueryExplainer() sqlParser, queryRunner.getStatsCalculator(), costCalculator, + new InMemoryNodeManager(), + new NodeSchedulerConfig().setIncludeCoordinator(true), ImmutableMap.of()); }