Propagate stats and cost in distributed explain plans#11268
Propagate stats and cost in distributed explain plans#11268rschlussel merged 4 commits intoprestodb:masterfrom
Conversation
There was a problem hiding this comment.
Changing to planFragmentsMap. I don't like when maps are named without the word map (or something that makes it clear it's a mapping). I always expect them to be a list or a set (or any other collection of just one thing).
There was a problem hiding this comment.
Heh... I actually dislike when variable names encode their type (unless there's a very good reason for that, such as a conflict with some other variable -- e.g., a when a builder is used, or when having to convert types and you need to hold on to both versions). In this particular case, fragmentPlanNodeMap or planFragmentsMap doesn't even convey what it represents. It's even ambiguous, since the main entity it holds is plan nodes (indexed by which fragment they belong to). I would suggest sourcesByFragment, nodesByFragment, sourceNodesByFragment, fragmentToSource or something like that.
There was a problem hiding this comment.
s/createFragmentedPlanSourceProvider/create
Also I would put it at the top, above the constructor
There was a problem hiding this comment.
should not this method be added as a regular rule to com.facebook.presto.cost.ComposableStatsCalculator?
There was a problem hiding this comment.
I did it that way at first but it doesn't work (at least not neatly) because it needs the state from the specific query to be able to get the sources.
There was a problem hiding this comment.
move it to the top, just below the constructor
There was a problem hiding this comment.
RemoteSourceStatsCalculator?
There was a problem hiding this comment.
it's a statscalculator for the whole fragmented plan. it just handles a remote source node.
There was a problem hiding this comment.
Inject PlanSourceProvider
There was a problem hiding this comment.
It needs to be created new for each query.
There was a problem hiding this comment.
I'll work on that tomorrow
There was a problem hiding this comment.
indent used to be 2 here. Now, if i'm following correctly, it is 0
There was a problem hiding this comment.
yeah but it was equally readable as zero, so it didn't seem worth having its own specialized constructor.
There was a problem hiding this comment.
Move this constructor after the very next one.
There was a problem hiding this comment.
ImmutableMap.copyOf(requireNonNull(fragmentPlanNodeMap, "fragmentPlanNodeMap is null"))
There was a problem hiding this comment.
Move it before the constructor
There was a problem hiding this comment.
verify(estimate!=null, "estimate is absent")
There was a problem hiding this comment.
Keep it after calculateRemoteSourceStats after you move one
There was a problem hiding this comment.
I like functional programming. But this seems like an abuse of one. I think something like this is way more readable
if(estimate != null){
estmate = addStatsAndMaxDistinctValues(estmate, currentSourceEstimate)
} else {
estimate = currentSourceEstimate;
}
There was a problem hiding this comment.
PlanNodeStatsEstimate currentSourceEstimate = mapToOutputSymbols(statsProvider.getStats(source), source.getOutputSymbols(), node.getOutputSymbols())
e8af73c to
ae7f27f
Compare
|
Addressed comments except writing tests and updated how PlanFragmentStatsCalculator gets the input symbols to fix the failing explain analyze tests. |
There was a problem hiding this comment.
Maybe FragmentedPlanSourceProvider::getSources for RemoteSourceNode should return nodes with output symbols already mapped? That way you wouldn't need to map symbols here.
There was a problem hiding this comment.
i think that would mess up getting the stats for those sources.
There was a problem hiding this comment.
Add a comment here explaining why remote sources are ignored in cost estimation.
of course, throwing would be safer here, in case cost calculator is used on actual plan somewhere else than in EXPLAIN.
There was a problem hiding this comment.
I was just making this a pass through so we could get the costs for other things, but now I'm thinking that we'll lose the exchange cost. Maybe I should make a FragmentCostCalculator extension to get the exchange properties from the fragment and then call calculateExchangeCost
There was a problem hiding this comment.
What about different approach -- that we de-fragment the plan and pass that to the cost calculator? Then the stats & costing code would not need to be modified.
There was a problem hiding this comment.
I was just making this a pass through so we could get the costs for other things, but now I'm thinking that we'll lose the exchange cost.
I didn't notice. The problem is that it's possible for "cost on fragmented plan" to be very off, and it may go unnoticed for long time: are we able to test-cover everything? will we notice in production when costs are off? (ultimately we will, but not too soon, as the costing here won't have any impact on execution)
There was a problem hiding this comment.
I had looked at defragmenting and computing the cost, but it gets very messy in the plan printer because each node you visit needs to have the defragmented plan and match up correctly (or re-defragment), etc.
I'm going to go with the FragmentedPlanCostCalculator approach for handling RemoteSourceNodes and I'll add a test that creates a plan and computes the cost and then fragments the plan and checks that the cumulative cost is the same.
There was a problem hiding this comment.
Can sourceProvider be modelled with a lookup?
There was a problem hiding this comment.
not without abusing the interface :). Lookups are for resolving GroupReferences to the nodes that are a part of that group. A SourceProvider gives you the list of source nodes for a given node.
There was a problem hiding this comment.
Did you check this PR for potential conflicts with #11267?
There was a problem hiding this comment.
doesn't overlap i don't think.
336d56e to
503cb2f
Compare
|
Updated:
|
|
Also @arhimondr. i didn't get rid of the stats calculator interface because it's useful for testing. |
503cb2f to
ddbf812
Compare
There was a problem hiding this comment.
nit: i would make the delegate field to be the first field and the first parameter in the constructor. You can do this easily by applying automatic refactoring.
There was a problem hiding this comment.
This seems to be fragile. How about storing the exchange type in the RemoteSourceNode? It should be an easy change.
There was a problem hiding this comment.
You can simply call it fragments. There is no name clash.
There was a problem hiding this comment.
This config is being created twice in this class. Create in in the LocalQueryRunner constructor and store it as a field, to make sure it is always the same.
There was a problem hiding this comment.
- Nullify all the fields after test class is finished
- Use
createTestTransactionManager(catalogManager) - Add constructor method that accepts
transactionManagerinMetadataManager, and usecreateTestMetadataManager(new FeaturesConfig(), transactionManager) - Optionally you can create similar
createTest*methods fornodePartitioningManager, that creates testnodePartitioningManagerthat isn't required to be stopped, and can be simply nullified (uses direct executors instead of thread pools, etc.)
Here is the example:
private CostCalculator costCalculatorUsingExchanges;
private CostCalculator costCalculatorWithEstimatedExchanges;
private PlanFragmenter planFragmenter;
private Session session;
private MetadataManager metadata;
private TransactionManager transactionManager;
private FinalizerService finalizerService;
private NodeScheduler nodeScheduler;
private NodePartitioningManager nodePartitioningManager;
@BeforeClass
public void setUp()
{
costCalculatorUsingExchanges = new CostCalculatorUsingExchanges(() -> NUMBER_OF_NODES);
costCalculatorWithEstimatedExchanges = new CostCalculatorWithEstimatedExchanges(costCalculatorUsingExchanges, () -> NUMBER_OF_NODES);
planFragmenter = new PlanFragmenter(new QueryManagerConfig());
session = testSessionBuilder().setCatalog("tpch").build();
CatalogManager catalogManager = new CatalogManager();
catalogManager.registerCatalog(createBogusTestingCatalog("tpch"));
transactionManager = createTestTransactionManager(catalogManager);
metadata = createTestMetadataManager(new FeaturesConfig(), transactionManager);
finalizerService = new FinalizerService();
finalizerService.start();
nodeScheduler = new NodeScheduler(
new LegacyNetworkTopology(),
new InMemoryNodeManager(),
new NodeSchedulerConfig().setIncludeCoordinator(true),
new NodeTaskMap(finalizerService));
nodePartitioningManager = new NodePartitioningManager(nodeScheduler);
}
@AfterClass(alwaysRun = true)
public void tearDown()
{
costCalculatorUsingExchanges = null;
costCalculatorWithEstimatedExchanges = null;
planFragmenter = null;
session = null;
transactionManager = null;
metadata = null;
finalizerService.destroy();
finalizerService = null;
nodeScheduler.stop();
nodeScheduler = null;
nodePartitioningManager = null;
}
There was a problem hiding this comment.
Similar comment about bootstraping the test class
private Metadata metadata;
private Session session;
@BeforeMethod
public void setUp()
{
metadata = createTestMetadataManager();
session = testSessionBuilder().build();
}
@AfterMethod(alwaysRun = true)
public void tearDown()
{
metadata = null;
session = null;
}
ddbf812 to
958a25d
Compare
|
Thanks for the review! Updated and I'll merge on Monday. |
This allows us to calculate stats for fragmented plans by using the equivalent plan with exchanges.
Print estimated stats and costs for all fragements in a distributed
explain plan.
Sample output:
presto:sf1> explain (type distributed) select * from nation n join region r on n.regionkey = r.regionkey;
Query Plan
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Fragment 0 [SINGLE]
Output layout: [nationkey, name, regionkey, comment, regionkey, name_1, comment_2]
Output partitioning: SINGLE []
Execution Flow: UNGROUPED_EXECUTION
- Output[nationkey, name, regionkey, comment, regionkey, name, comment] => [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152), regionkey:bigint, name_1:varchar(25), comment_2:varchar(152)]
Cost: {rows: 25 (4.91kB), cpu: 15427.00, memory: 504.00, network: 0.00}
name := name_1
comment := comment_2
- RemoteSource[1] => [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152), name_1:varchar(25), comment_2:varchar(152)]
Cost: {rows: 25 (4.69kB), cpu: 15427.00, memory: 504.00, network: 0.00}
Fragment 1 [HASH]
Output layout: [nationkey, name, regionkey, comment, name_1, comment_2]
Output partitioning: SINGLE []
Execution Flow: UNGROUPED_EXECUTION
- InnerJoin[("regionkey" = "regionkey_0")][$hashvalue, $hashvalue_34] => [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152), name_1:varchar(25), comment_2:varchar(152)]
Distribution: PARTITIONED
Cost: {rows: 25 (4.69kB), cpu: 15427.00, memory: 504.00, network: 0.00}
- RemoteSource[2] => [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152), $hashvalue:bigint]
Cost: {rows: 25 (2.89kB), cpu: 5693.00, memory: 0.00, network: 0.00}
- LocalExchange[HASH][$hashvalue_34] ("regionkey_0") => regionkey_0:bigint, name_1:varchar(25), comment_2:varchar(152), $hashvalue_34:bigint
Cost: {rows: 5 (504B), cpu: 1467.00, memory: 0.00, network: 0.00}
- RemoteSource[3] => [regionkey_0:bigint, name_1:varchar(25), comment_2:varchar(152), $hashvalue_35:bigint]
Cost: {rows: 5 (504B), cpu: 963.00, memory: 0.00, network: 0.00}
Fragment 2 [SOURCE]
Output layout: [nationkey, name, regionkey, comment, $hashvalue_33]
Output partitioning: HASH [regionkey][$hashvalue_33]
Execution Flow: UNGROUPED_EXECUTION
- ScanProject[table = tpch:tpch:nation:sf1.0, originalConstraint = true] => [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152), $hashvalue_33:bigint]
Cost: {rows: 25 (2.67kB), cpu: 2734.00, memory: 0.00, network: 0.00}/{rows: 25 (2.89kB), cpu: 5693.00, memory: 0.00, network: 0.00}
$hashvalue_33 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("regionkey"), 0))
nationkey := tpch:nationkey
regionkey := tpch:regionkey
name := tpch:name
comment := tpch:comment
Fragment 3 [SOURCE]
Output layout: [regionkey_0, name_1, comment_2, $hashvalue_36]
Output partitioning: HASH [regionkey_0][$hashvalue_36]
Execution Flow: UNGROUPED_EXECUTION
- ScanProject[table = tpch:tpch:region:sf1.0, originalConstraint = true] => [regionkey_0:bigint, name_1:varchar(25), comment_2:varchar(152), $hashvalue_36:bigint]
Cost: {rows: 5 (459B), cpu: 459.00, memory: 0.00, network: 0.00}/{rows: 5 (504B), cpu: 963.00, memory: 0.00, network: 0.00}
$hashvalue_36 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("regionkey_0"), 0))
comment_2 := tpch:comment
name_1 := tpch:name
regionkey_0 := tpch:regionkey
(1 row)
958a25d to
15c9833
Compare
| builder.append(textLogicalPlan(fragment.getRoot(), TypeProvider.copyOf(fragment.getSymbols()), Optional.of(fragment.getStageExecutionStrategy()), functionRegistry, statsCalculator, costCalculator, session, Optional.empty(), 1, verbose)) | ||
| .append("\n"); | ||
| } | ||
| TypeProvider typeProvider = TypeProvider.copyOf(allFragments.stream() |
There was a problem hiding this comment.
@rschlussel : My understanding is we need allFragments as we want to create sourceProvider and statsCalculator. However, later you make stats and cost precomputed and store in PlanFragment: #11511
I am wondering do we still need to instantiate the TypeProvider with all fragments?
fixes #10963