Skip to content

Rewrite expressions in query plans when variables are constant.#19836

Merged
feilong-liu merged 3 commits intoprestodb:masterfrom
feilong-liu:pass_constant
Jan 23, 2024
Merged

Rewrite expressions in query plans when variables are constant.#19836
feilong-liu merged 3 commits intoprestodb:masterfrom
feilong-liu:pass_constant

Conversation

@feilong-liu
Copy link
Contributor

@feilong-liu feilong-liu commented Jun 8, 2023

Rewrite expressions in query plans if we know some variables are constant.

The benefit of this optimizer is in three aspects:

  • add a projection to assign a variable with a constant when it's constant. The benefit is that, at runtime, constant projection assignments return RunLengthEncoded block, which consumes less memory, and we also have a handful of runtime optimizations for RunLengthEncoded block
  • reduce the amount of data propagated during query execution. For example, if we have a filter on a column at the very bottom of the query and this column is not needed in some intermediate operators, we can avoid passing this column between these operators. A simplified example is select orderkey, l.partkey, o.orderstatus from lineitem l join orders o using (orderkey) where orderstatus='F', with the optimization, orderstatus will not be in the join hashtable.
  • Simplify some nodes, for example, Sort on constant variables, group by constant variables etc.

A variable reference expression in a query plan can be a constant 1) after a filter expression, for example, select orderstatus, shippriority, count(*) from orders where orderstatus='F' group by orderstatus, shippriority, orderstatus will be a constant after the filter expression. 2) after a project node which assigns constant to a variable reference.

After we know these constant variable references, we can avoid passing them along the query plans. Take the above query as an example, orderstatus is not passed along the plan.

Query: select orderstatus, shippriority, count(*) from orders where orderstatus='F' group by orderstatus, shippriority
Before:

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Fragment 0 [SINGLE]                                                                                                                                                                                               >
     Output layout: [orderstatus, shippriority, count]                                                                                                                                                             >
     Output partitioning: SINGLE []                                                                                                                                                                                >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                 >
     - Output[orderstatus, shippriority, _col2] => [orderstatus:varchar(1), shippriority:integer, count:bigint]                                                                                                    >
             _col2 := count (1:62)                                                                                                                                                                                 >
         - RemoteSource[1] => [orderstatus:varchar(1), shippriority:integer, count:bigint]                                                                                                                         >
                                                                                                                                                                                                                   >
 Fragment 1 [HASH]                                                                                                                                                                                                 >
     Output layout: [orderstatus, shippriority, count]                                                                                                                                                             >
     Output partitioning: SINGLE []                                                                                                                                                                                >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                 >
     - Project[projectLocality = LOCAL] => [orderstatus:varchar(1), shippriority:integer, count:bigint]                                                                                                            >
         - Aggregate(FINAL)[orderstatus, shippriority][$hashvalue] => [orderstatus:varchar(1), shippriority:integer, $hashvalue:bigint, count:bigint]                                                              >
                 count := "presto.default.count"((count_13)) (1:62)                                                                                                                                                >
             - LocalExchange[HASH][$hashvalue] (orderstatus, shippriority) => [orderstatus:varchar(1), shippriority:integer, count_13:bigint, $hashvalue:bigint]                                                   >
                 - RemoteSource[2] => [orderstatus:varchar(1), shippriority:integer, count_13:bigint, $hashvalue_14:bigint]                                                                                        >
                                                                                                                                                                                                                   >
 Fragment 2 [SOURCE]                                                                                                                                                                                               >
     Output layout: [orderstatus, shippriority, count_13, $hashvalue_15]                                                                                                                                           >
     Output partitioning: HASH [orderstatus, shippriority][$hashvalue_15]                                                                                                                                          >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                 >
     - Aggregate(PARTIAL)[orderstatus, shippriority][$hashvalue_15] => [orderstatus:varchar(1), shippriority:integer, $hashvalue_15:bigint, count_13:bigint]                                                       >
             count_13 := "presto.default.count"(*) (1:62)                                                                                                                                                          >
         - ScanFilterProject[table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=orders, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.orders{dom>
                 Estimates: {source: CostBasedSourceInfo, rows: 15000 (424.80kB), cpu: 165000.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 5000 (141.60kB), cpu: 330000.00, memory: 0.00, n>
                 $hashvalue_15 := combine_hash(combine_hash(BIGINT'0', COALESCE($operator$hash_code(orderstatus), BIGINT'0')), COALESCE($operator$hash_code(shippriority), BIGINT'0')) (1:114)                     >
                 LAYOUT: tpch.orders{domains={orderstatus=[ [["F"]] ]}}                                                                                                                                            >
                 orderstatus := orderstatus:varchar(1):2:REGULAR (1:76)                                                                                                                                            >
                 shippriority := shippriority:int:7:REGULAR (1:76)   

After optimization:

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Fragment 0 [SINGLE]                                                                                                                                                                                               >
     Output layout: [expr_10, shippriority, count]                                                                                                                                                                 >
     Output partitioning: SINGLE []                                                                                                                                                                                >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                 >
     - Output[orderstatus, shippriority, _col2] => [expr_10:varchar(1), shippriority:integer, count:bigint]                                                                                                        >
             orderstatus := expr_10 (1:35)                                                                                                                                                                         >
             _col2 := count (1:62)                                                                                                                                                                                 >
         - RemoteSource[1] => [expr_10:varchar(1), shippriority:integer, count:bigint]                                                                                                                             >
                                                                                                                                                                                                                   >
 Fragment 1 [HASH]                                                                                                                                                                                                 >
     Output layout: [expr_10, shippriority, count]                                                                                                                                                                 >
     Output partitioning: SINGLE []                                                                                                                                                                                >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                 >
     - Project[projectLocality = LOCAL] => [expr_10:varchar(1), shippriority:integer, count:bigint]                                                                                                                >
             expr_10 := VARCHAR'F'                                                                                                                                                                                 >
         - Aggregate(FINAL)[shippriority][$hashvalue] => [shippriority:integer, $hashvalue:bigint, count:bigint]                                                                                                   >
                 count := "presto.default.count"((count_13)) (1:62)                                                                                                                                                >
             - LocalExchange[HASH][$hashvalue] (shippriority) => [shippriority:integer, count_13:bigint, $hashvalue:bigint]                                                                                        >
                 - RemoteSource[2] => [shippriority:integer, count_13:bigint, $hashvalue_14:bigint]                                                                                                                >
                                                                                                                                                                                                                   >
 Fragment 2 [SOURCE]                                                                                                                                                                                               >
     Output layout: [shippriority, count_13, $hashvalue_15]                                                                                                                                                        >
     Output partitioning: HASH [shippriority][$hashvalue_15]                                                                                                                                                       >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                 >
     - Aggregate(PARTIAL)[shippriority][$hashvalue_15] => [shippriority:integer, $hashvalue_15:bigint, count_13:bigint]                                                                                            >
             count_13 := "presto.default.count"(*) (1:62)                                                                                                                                                          >
         - ScanFilterProject[table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=orders, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.orders{dom>
                 Estimates: {source: CostBasedSourceInfo, rows: 15000 (336.91kB), cpu: 165000.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 5000 (112.30kB), cpu: 330000.00, memory: 0.00, n>
                 $hashvalue_15 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(shippriority), BIGINT'0')) (1:127)                                                                                          >
                 LAYOUT: tpch.orders{domains={orderstatus=[ [["F"]] ]}}                                                                                                                                            >
                 orderstatus := orderstatus:varchar(1):2:REGULAR (1:76)                                                                                                                                            >
                 shippriority := shippriority:int:7:REGULAR (1:76) 

Test plan - (Please fill in how you tested your changes)

Add unit tests, and also run verifier runs
verifier run, run2, run3, run4, run5, run6

== RELEASE NOTES ==

General Changes
* Add an optimization to optimize queries which has equivalence check filter or constant assignments. Controlled by session property `rewrite_expression_with_constant_expression` and default to enabled.

@feilong-liu feilong-liu requested a review from a team as a code owner June 8, 2023 17:55
@feilong-liu feilong-liu requested a review from presto-oss June 8, 2023 17:55
@feilong-liu feilong-liu marked this pull request as draft June 8, 2023 17:55
@feilong-liu feilong-liu force-pushed the pass_constant branch 6 times, most recently from 6f2b197 to 26b3a1b Compare June 14, 2023 23:48
@feilong-liu feilong-liu force-pushed the pass_constant branch 8 times, most recently from f026e02 to 1262756 Compare June 21, 2023 23:29
@feilong-liu feilong-liu force-pushed the pass_constant branch 2 times, most recently from 99ee828 to ab08ba2 Compare July 10, 2023 23:01
@feilong-liu feilong-liu changed the title [Draft] pass constant variable in query plan Rewrite expressions in query plans when variables are constant. Jul 10, 2023
@feilong-liu feilong-liu marked this pull request as ready for review July 10, 2023 23:47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this constraint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason, it's just I am being cautious here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove this. not sure what benefits we're getting by limiting the supported types here.

@mlyublena
Copy link
Contributor

Do we also rerun constant folding if we find all of the variables in an expression have become constants? for example:

select orderkey+1 as nk from lineitem where orderkey=1;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a useful utility function we can add to PlannerUtils, something like isEquality

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like that we already have such utils, updated to use existing utils.

@aaneja
Copy link
Contributor

aaneja commented Nov 7, 2023

I do not think the estimated cost reported here reflects the real change.

I agree. Let's make an issue to model the cost reduction that we expect for such a ProjectNode ?

@feilong-liu feilong-liu force-pushed the pass_constant branch 3 times, most recently from 38860b0 to 31f3e6b Compare November 8, 2023 20:24
@feilong-liu feilong-liu requested review from aaneja and removed request for aaneja November 8, 2023 23:44
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the constant folding rule will make some Join outputs redundant, IMO we should run it immediately after (and call out the need for it as well). Otherwise there's a chance we move PruneUnreferencedOutputs even further down during a refactor

Comment on lines +490 to +732
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this get replaced with an empty values node with some other rule later ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be convert to empty values node by predicate pushdown later

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Using Map keys like orderkey_11 may give the impression that the plan matcher will do an exact match with this text, but this is not the case. Let's use more descriptive key name for this assignments matcher

Copy link
Contributor

@aaneja aaneja Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found the root cause - When the PlanNode node is an OutputNode we can skip running planAndReplace on this output node and just call accept on it's source, since the OutputNode will never have a parent. Once I added :

 @Override
        public PlanNodeWithConstant visitOutput(OutputNode node, Void context)
        {
            PlanNodeWithConstant updatedSource = accept(node.getSource());
            return new PlanNodeWithConstant(node.replaceChildren(ImmutableList.of(updatedSource.getPlanNode())), ImmutableMap.of());
        }

the duplicate Projects added went away.
We can do this for other terminal nodes as well - TableWriter, TableFinish ?

Copy link
Contributor

@aaneja aaneja left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % some minor comments

@feilong-liu feilong-liu force-pushed the pass_constant branch 3 times, most recently from 713d9fe to 2fc07ea Compare November 11, 2023 02:08
@feilong-liu feilong-liu force-pushed the pass_constant branch 2 times, most recently from 7a7e1f4 to 8342573 Compare January 5, 2024 23:12
@steveburnett
Copy link
Contributor

Would it be appropriate to add documentation of this REWRITE_EXPRESSION_WITH_CONSTANT_EXPRESSION session property? Perhaps in Properties Reference.

@github-actions
Copy link

Codenotify: Notifying subscribers in CODENOTIFY files for diff 3d6bba2...7cb8801.

Notify File(s)
@steveburnett presto-docs/src/main/sphinx/admin/properties.rst

Copy link
Contributor

@pranjalssh pranjalssh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (docs)

Pull branch, local build of docs, everything looks good. Thanks!

@feilong-liu feilong-liu merged commit 41d290e into prestodb:master Jan 23, 2024
@feilong-liu feilong-liu deleted the pass_constant branch January 23, 2024 05:11
@wanglinsong wanglinsong mentioned this pull request Feb 12, 2024
64 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants