Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,29 @@ public class CachingCostProvider
private final Lookup lookup;
private final Session session;
private final TypeProvider types;
private final PlanNodeSourceProvider sourceProvider;

private final Map<PlanNode, PlanNodeCostEstimate> cache = new IdentityHashMap<>();

public CachingCostProvider(CostCalculator costCalculator, StatsProvider statsProvider, Session session, TypeProvider types)
public CachingCostProvider(CostCalculator costCalculator, StatsProvider statsProvider, Optional<Memo> memo, Lookup lookup, Session session, TypeProvider types)
{
this(costCalculator, statsProvider, Optional.empty(), noLookup(), session, types);
this(costCalculator, statsProvider, memo, lookup, session, types, PlanNode::getSources);
}

public CachingCostProvider(CostCalculator costCalculator, StatsProvider statsProvider, Optional<Memo> memo, Lookup lookup, Session session, TypeProvider types)
public CachingCostProvider(CostCalculator costCalculator, StatsProvider statsProvider, Session session, TypeProvider types, PlanNodeSourceProvider sourceProvider)
{
this(costCalculator, statsProvider, Optional.empty(), noLookup(), session, types, sourceProvider);
}

public CachingCostProvider(CostCalculator costCalculator, StatsProvider statsProvider, Optional<Memo> memo, Lookup lookup, Session session, TypeProvider types, PlanNodeSourceProvider sourceProvider)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you check this PR for potential conflicts with #11267?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't overlap i don't think.

{
this.costCalculator = requireNonNull(costCalculator, "costCalculator is null");
this.statsProvider = requireNonNull(statsProvider, "statsProvider is null");
this.memo = requireNonNull(memo, "memo is null");
this.lookup = requireNonNull(lookup, "lookup is null");
this.session = requireNonNull(session, "session is null");
this.types = requireNonNull(types, "types is null");
this.sourceProvider = requireNonNull(sourceProvider, "sourceProvider is null");
}

@Override
Expand Down Expand Up @@ -95,7 +102,7 @@ private PlanNodeCostEstimate calculateCumulativeCost(PlanNode node)
{
PlanNodeCostEstimate localCosts = costCalculator.calculateCost(node, statsProvider, lookup, session, types);

PlanNodeCostEstimate sourcesCost = node.getSources().stream()
PlanNodeCostEstimate sourcesCost = sourceProvider.getSources(node).stream()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can sourceProvider be modelled with a lookup?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not without abusing the interface :). Lookups are for resolving GroupReferences to the nodes that are a part of that group. A SourceProvider gives you the list of source nodes for a given node.

.map(this::getCumulativeCost)
.reduce(ZERO_COST, PlanNodeCostEstimate::add);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.cost;

import com.facebook.presto.Session;
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.iterative.Lookup;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.PlanNode;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;

import java.util.function.IntSupplier;

import static com.facebook.presto.cost.CostCalculatorUsingExchanges.calculateExchangeCost;
import static com.facebook.presto.cost.CostCalculatorUsingExchanges.currentNumberOfWorkerNodes;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE;
import static java.util.Objects.requireNonNull;

public class FragmentedPlanCostCalculator
implements CostCalculator
{
private final CostCalculator delegate;
private final FragmentedPlanSourceProvider sourceProvider;
private final IntSupplier numberOfNodes;

public FragmentedPlanCostCalculator(FragmentedPlanSourceProvider sourceProvider, CostCalculator delegate, InternalNodeManager nodeManager, NodeSchedulerConfig nodeSchedulerConfig)
{
this(delegate, sourceProvider, currentNumberOfWorkerNodes(nodeSchedulerConfig.isIncludeCoordinator(), nodeManager));
}

public FragmentedPlanCostCalculator(CostCalculator delegate, FragmentedPlanSourceProvider sourceProvider, IntSupplier numberOfNodes)
{
this.sourceProvider = requireNonNull(sourceProvider, "sourceProvider is null");
this.delegate = requireNonNull(delegate, "delegate is null");
this.numberOfNodes = requireNonNull(numberOfNodes, "numberOfNodes is null");
}

@Override
public PlanNodeCostEstimate calculateCost(PlanNode node, StatsProvider stats, Lookup lookup, Session session, TypeProvider types)
{
if (node instanceof RemoteSourceNode) {
return calculateRemoteSourceNodeCost((RemoteSourceNode) node, stats, types);
}
else {
return delegate.calculateCost(node, stats, lookup, session, types);
}
}

private PlanNodeCostEstimate calculateRemoteSourceNodeCost(RemoteSourceNode node, StatsProvider stats, TypeProvider types)
{
PlanNodeCostEstimate costEstimate = PlanNodeCostEstimate.ZERO_COST;
ExchangeNode.Type exchangeType = node.getExchangeType();
for (PlanNode source : sourceProvider.getSources(node)) {
PlanNodeCostEstimate exchangeCost = calculateExchangeCost(numberOfNodes.getAsInt(), stats.getStats(source), node.getOutputSymbols(), exchangeType, REMOTE, types);
costEstimate.add(exchangeCost);
}
return costEstimate;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.cost;

import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.plan.PlanNode;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.google.common.collect.ImmutableMap;

import java.util.List;
import java.util.Map;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.util.Objects.requireNonNull;

public class FragmentedPlanSourceProvider
implements PlanNodeSourceProvider
{
private final Map<PlanFragmentId, PlanFragment> fragments;

public static FragmentedPlanSourceProvider create(List<PlanFragment> planFragments)
{
Map<PlanFragmentId, PlanFragment> fragmentIdPlanNodeMap = planFragments.stream()
.collect(toImmutableMap(PlanFragment::getId, fragment -> fragment));
return new FragmentedPlanSourceProvider(fragmentIdPlanNodeMap);
}

private FragmentedPlanSourceProvider(Map<PlanFragmentId, PlanFragment> fragments)
{
this.fragments = ImmutableMap.copyOf(requireNonNull(fragments, "fragments is null"));
}

@Override
public List<PlanNode> getSources(PlanNode node)
{
if (node instanceof RemoteSourceNode) {
return ((RemoteSourceNode) node).getSourceFragmentIds().stream()
.map(id -> {
verify(fragments.containsKey(id), "fragment id not in map: %s", id);
return fragments.get(id).getRoot();
})
.collect(toImmutableList());
}

return node.getSources();
}

public List<PlanFragment> getSourceFragments(RemoteSourceNode node)
{
return node.getSourceFragmentIds().stream()
.map(id -> {
verify(fragments.containsKey(id), "fragment id not in map: %s", id);
return fragments.get(id);
})
.collect(toImmutableList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double licence


package com.facebook.presto.cost;

import com.facebook.presto.Session;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.Symbol;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.iterative.Lookup;
import com.facebook.presto.sql.planner.plan.PlanNode;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;

import java.util.List;

import static com.facebook.presto.cost.PlanNodeStatsEstimateMath.addStatsAndMaxDistinctValues;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static java.util.Objects.requireNonNull;

public class FragmentedPlanStatsCalculator
implements StatsCalculator
{
private final StatsCalculator delegate;
private final FragmentedPlanSourceProvider sourceProvider;

public FragmentedPlanStatsCalculator(StatsCalculator delegate, FragmentedPlanSourceProvider sourceProvider)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inject PlanSourceProvider

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to be created new for each query.

{
this.delegate = requireNonNull(delegate, "delegate is null");
this.sourceProvider = requireNonNull(sourceProvider, "source provider is null");
}

@Override
public PlanNodeStatsEstimate calculateStats(PlanNode node, StatsProvider sourceStats, Lookup lookup, Session session, TypeProvider types)
{
if (node instanceof RemoteSourceNode) {
return calculateRemoteSourceStats((RemoteSourceNode) node, sourceStats);
}
return delegate.calculateStats(node, sourceStats, lookup, session, types);
}

private PlanNodeStatsEstimate calculateRemoteSourceStats(RemoteSourceNode node, StatsProvider statsProvider)
{
PlanNodeStatsEstimate estimate = null;
for (PlanFragment sourceFragment : sourceProvider.getSourceFragments(node)) {
PlanNodeStatsEstimate sourceStatsWithMappedSymbols = mapToOutputSymbols(statsProvider.getStats(sourceFragment.getRoot()), sourceFragment.getPartitioningScheme().getOutputLayout(), node.getOutputSymbols());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe FragmentedPlanSourceProvider::getSources for RemoteSourceNode should return nodes with output symbols already mapped? That way you wouldn't need to map symbols here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think that would mess up getting the stats for those sources.


if (estimate != null) {
estimate = addStatsAndMaxDistinctValues(estimate, sourceStatsWithMappedSymbols);
}
else {
estimate = sourceStatsWithMappedSymbols;
}
}

verify(estimate != null, "estimate is null");
return estimate;
}

private PlanNodeStatsEstimate mapToOutputSymbols(PlanNodeStatsEstimate estimate, List<Symbol> inputs, List<Symbol> outputs)
{
checkArgument(inputs.size() == outputs.size(), "Input symbols count does not match output symbols count");
PlanNodeStatsEstimate.Builder mapped = PlanNodeStatsEstimate.builder()
.setOutputRowCount(estimate.getOutputRowCount());

for (int i = 0; i < inputs.size(); i++) {
mapped.addSymbolStatistics(outputs.get(i), estimate.getSymbolStatistics(inputs.get(i)));
}

return mapped.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.cost;

import com.facebook.presto.sql.planner.plan.PlanNode;

import java.util.List;

@FunctionalInterface
public interface PlanNodeSourceProvider
{
List<PlanNode> getSources(PlanNode node);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import com.facebook.presto.client.NodeVersion;
import com.facebook.presto.connector.ConnectorId;
import com.facebook.presto.cost.CostCalculator;
import com.facebook.presto.cost.StatsCalculator;
import com.facebook.presto.eventlistener.EventListenerManager;
import com.facebook.presto.execution.Column;
import com.facebook.presto.execution.ExecutionFailureInfo;
Expand All @@ -25,7 +27,9 @@
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskState;
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
import com.facebook.presto.metadata.FunctionRegistry;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.operator.DriverStats;
Expand Down Expand Up @@ -66,8 +70,6 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static com.facebook.presto.cost.PlanNodeCostEstimate.UNKNOWN_COST;
import static com.facebook.presto.cost.PlanNodeStatsEstimate.UNKNOWN_STATS;
import static com.facebook.presto.sql.planner.planPrinter.PlanPrinter.textDistributedPlan;
import static java.lang.Math.max;
import static java.lang.Math.toIntExact;
Expand All @@ -88,6 +90,10 @@ public class QueryMonitor
private final SessionPropertyManager sessionPropertyManager;
private final FunctionRegistry functionRegistry;
private final int maxJsonLimit;
private final StatsCalculator statsCalculator;
private final CostCalculator costCalculator;
private final InternalNodeManager nodeManager;
private final NodeSchedulerConfig nodeSchedulerConfig;

@Inject
public QueryMonitor(
Expand All @@ -98,7 +104,11 @@ public QueryMonitor(
NodeVersion nodeVersion,
SessionPropertyManager sessionPropertyManager,
Metadata metadata,
QueryMonitorConfig config)
QueryMonitorConfig config,
StatsCalculator statsCalculator,
CostCalculator costCalculator,
InternalNodeManager nodeManager,
NodeSchedulerConfig nodeSchedulerConfig)
{
this.eventListenerManager = requireNonNull(eventListenerManager, "eventListenerManager is null");
this.stageInfoCodec = requireNonNull(stageInfoCodec, "stageInfoCodec is null");
Expand All @@ -109,6 +119,10 @@ public QueryMonitor(
this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
this.functionRegistry = requireNonNull(metadata, "metadata is null").getFunctionRegistry();
this.maxJsonLimit = toIntExact(requireNonNull(config, "config is null").getMaxOutputStageJsonSize().toBytes());
this.statsCalculator = requireNonNull(statsCalculator, "statsCalculator is null");
this.costCalculator = requireNonNull(costCalculator, "costCalculator is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.nodeSchedulerConfig = requireNonNull(nodeSchedulerConfig, "nodeSchedulerConfig is null");
}

public void queryCreatedEvent(QueryInfo queryInfo)
Expand Down Expand Up @@ -199,12 +213,13 @@ public void queryCompletedEvent(QueryInfo queryInfo)
Optional<String> plan = Optional.empty();
try {
if (queryInfo.getOutputStage().isPresent()) {
// Stats and costs are suppress, since transaction is already completed
plan = Optional.of(textDistributedPlan(
queryInfo.getOutputStage().get(),
functionRegistry,
(node, sourceStats, lookup, session, types) -> UNKNOWN_STATS,
(node, stats, lookup, session, types) -> UNKNOWN_COST,
statsCalculator,
costCalculator,
nodeManager,
nodeSchedulerConfig,
queryInfo.getSession().toSession(sessionPropertyManager),
false));
}
Expand Down
Loading