Skip to content

pushdown dereference expression#10064

Closed
qqibrow wants to merge 1 commit intoprestodb:masterfrom
qqibrow:nestedcolumns
Closed

pushdown dereference expression#10064
qqibrow wants to merge 1 commit intoprestodb:masterfrom
qqibrow:nestedcolumns

Conversation

@qqibrow
Copy link
Contributor

@qqibrow qqibrow commented Mar 1, 2018

Following design:

  1. PushdownDereferenceExpression
    Dereference expressions will be pushed down to the projection right above tableScan, which saves CPU/Memory/Network cost.

for query:

with t1 as ( select * from (values ROW(CAST(ROW(1, 2.0) AS ROW(x BIGINT, y DOUBLE)))) 
as t (msg) ) 
select b.msg.x from t1 a, t1 b where a.msg.y = b.msg.y

current plan:

Output[x] => [expr_16:bigint]
        Cost: {rows: 1 (8B), cpu: 297.00, memory: 69.00, network: 0.00}
        x := expr_16
    - Project[] => [expr_16:bigint]
            Cost: {rows: 1 (8B), cpu: 297.00, memory: 69.00, network: 0.00}
            expr_16 := "field_7".x
        - InnerJoin[("expr_20" = "expr_21")][$hashvalue, $hashvalue_22] => [field_7:row(x bigint, y double)]
                Cost: {rows: 1 (45B), cpu: 288.90, memory: 69.00, network: 0.00}
            - Project[] => [expr_20:double, $hashvalue:bigint]
                    Cost: {rows: 1 (18B), cpu: 27.00, memory: 0.00, network: 0.00}
                    $hashvalue := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("expr_20"), 0))
                - Project[] => [expr_20:double]
                        Cost: {rows: 1 (9B), cpu: 9.00, memory: 0.00, network: 0.00}
                        expr_20 := "field".y
                    - Values => [field:row(x bigint, y double)]                       
            - Project[] => [field_7:row(x bigint, y double), expr_21:double, $hashvalue_22:bigint]
                    Cost: {rows: 1 (69B), cpu: 129.00, memory: 0.00, network: 0.00}
                    $hashvalue_22 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("expr_21"), 0))
                - Project[] => [field_7:row(x bigint, y double), expr_21:double]
                        Cost: {rows: 1 (60B), cpu: 60.00, memory: 0.00, network: 0.00}
                        expr_21 := "field_7".y
                    - Values => [field_7:row(x bigint, y double)]

enable dereference pushdown:

 Output[x] => [expr_22:bigint]
        Cost: {rows: 1 (8B), cpu: 125.10, memory: 27.00, network: 0.00}
        x := expr_22
    - InnerJoin[("expr_23" = "expr_24")][$hashvalue, $hashvalue_25] => [expr_22:bigint]
            Cost: {rows: 1 (8B), cpu: 125.10, memory: 27.00, network: 0.00}
        - Project[] => [expr_23:double, $hashvalue:bigint]
                Cost: {rows: 1 (18B), cpu: 27.00, memory: 0.00, network: 0.00}
                $hashvalue := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("expr_23"), 0))
            - Project[] => [expr_23:double]
                    Cost: {rows: 1 (9B), cpu: 9.00, memory: 0.00, network: 0.00}
                    expr_23 := "field".y
                - Values => [field:row(x bigint, y double)]
                      
        - Project[] => [expr_22:bigint, expr_24:double, $hashvalue_25:bigint]
                Cost: {rows: 1 (27B), cpu: 45.00, memory: 0.00, network: 0.00}
                $hashvalue_25 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("expr_24"), 0))
            - Project[] => [expr_22:bigint, expr_24:double]
                    Cost: {rows: 1 (18B), cpu: 18.00, memory: 0.00, network: 0.00}
                    expr_22 := "field_7".x
                    expr_24 := "field_7".y
                - Values => [field_7:row(x bigint, y double)]
  1. MergeNestedColumns
    MergeNestedColumns will detect "project above tableScan" pattern and try to push nestedColumn metadata into TableScan. An new Metadata API getNestedColumnHandles is created to support custom nested column pushdown logic.

  2. getNestedColumnHandles in Hive
    getNestedColumnHandles in Hive return every dereference as independent HiveColumnHandle. Added Optional<NestedColumn> in HiveColumnHandle.

After all query plan will looks like:

explain select msg.workflow.uuid, msg.action.uuid from foo.bar where msg.workflow.uuid = 'abc' and msg.action.name = 'send_sms' limit 10;

before:

- Output[uuid, uuid] => [expr_7:varchar, expr_8:varchar]
        uuid := expr_7
        uuid := expr_8
    - Limit[10] => [expr_7:varchar, expr_8:varchar]
        - LocalExchange[SINGLE] () => expr_7:varchar, expr_8:varchar
            - RemoteExchange[GATHER] => expr_7:varchar, expr_8:varchar
                - LimitPartial[10] => [expr_7:varchar, expr_8:varchar]
                    - ScanFilterProject[table = hive:foo.bar, filterPredicate = ((""msg"".workflow.uuid = CAST('abc' AS varchar)) AND (""msg"".action.name = CAST('send_sms' AS varchar)))] => [expr_7:varchar, expr_8:varchar]
                            expr_7 := ""msg"".workflow.uuid
                            expr_8 := ""msg"".action.uuid
                            LAYOUT: foo.bar
                            msg := msg:struct<......>:12:REGULAR
                            datestr:string:-1:PARTITION_KEY
                                :: [[2000-05-23, 2050-10-26]]

after:

- Output[uuid, uuid] => [msg.workflow.uuid:varchar, msg.action.uuid:varchar]
        uuid := msg.workflow.uuid
        uuid := msg.action.uuid
    - Limit[10] => [msg.workflow.uuid:varchar, msg.action.uuid:varchar]
        - LocalExchange[SINGLE] () => msg.workflow.uuid:varchar, msg.action.uuid:varchar              
            - RemoteExchange[GATHER] => msg.workflow.uuid:varchar, msg.action.uuid:varchar                
                - LimitPartial[10] => [msg.workflow.uuid:varchar, msg.action.uuid:varchar]                       
                    - ScanFilterProject[table = hive:foo.bar, filterPredicate = ((""msg.action.name"" = CAST('send_sms' AS varchar)) AND (""msg.workflow.uuid"" = CAST('abc' AS varchar)))] => [msg.workflow.uuid:varchar, msg.action.uuid:varchar]
                            LAYOUT: foo.bar
                            msg.action.uuid := msg.action.uuid:string:12:REGULAR
                            msg.workflow.uuid := msg.workflow.uuid:string:12:REGULAR
                            msg.action.name := msg.action.name:string:12:REGULAR
                            datestr:string:-1:PARTITION_KEY
                                :: [[2000-05-23, 2050-10-26]]
  1. Code change in ParquetReader that only read selected columns defined in HiveColumnHandle (..., Optional)

Copy link
Contributor

@kokosing kokosing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just skimmed. Left some general comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it behind the feature flag?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is odd. What is the purpose of this and why com.facebook.presto.sql.planner.SymbolsExtractor is not good enough for you case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. I replaced that using SymbolsExtractor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bad idea. Identifier belongs to AST and symbol to IR (plan node tree) model.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the class is removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static import?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InlineDereferenceExpressions Rule is removed. class PushdownDereferenceExpression and change in InlineProjections are enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry but I do not see any test for that Rule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class is removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any point of having this added to all optimizers if you cannot take advantage of this in table scan operator today?

Maybe we could just add dead code until the moment you have all the pieces needed to prune unreferenced columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. After this part, I will start working on part II and I will not merge this until that is finished. what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about other expression types which may contain Symbol in it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please format this as:

Set<Symbol> dereferenceSymbols = parent.getAssignments().entrySet().stream()
    .filter(entry -> isBaseSymbolInChild.apply(entry.getValue()))
    .map(Map.Entry::getKey)
    .collect(Collectors.toSet());

Use the same formatting for Assignments.builder()....build();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me it is a bit odd to create Function to use it as Predicate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall I add tests now or after refactoring using Rules, if refactoring is possible?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put it below Optimizer

@qqibrow
Copy link
Contributor Author

qqibrow commented Mar 7, 2018

@kokosing thank you for reviewing! I refactored the code a bit and working on fixing several test errors. could you take a look at my replies first? thanks!

afranzi pushed a commit to afranzi/presto that referenced this pull request Apr 25, 2018
afranzi pushed a commit to afranzi/presto that referenced this pull request Apr 25, 2018
afranzi added a commit to afranzi/presto that referenced this pull request Apr 25, 2018
* prestodb#10064 - Pushdown dereference expression

* prestodb#10064 - Pushdown dereference expression
@qqibrow
Copy link
Contributor Author

qqibrow commented Jun 1, 2018

Sorry for the delay. I was waiting for the new parquet reader fixing and refactoring #9156. Will continue this patch from now.

@oneonestar
Copy link
Contributor

presto:test> create table test( str varchar, filter row(x int, y int) );
CREATE TABLE
presto:test> select str from test where filter.x = 1;
Query 20180926_041036_00004_ngfpn failed: Invalid node. Expression dependencies ([str]) not in source plan output ([filter_4])
java.lang.IllegalArgumentException: Invalid node. Expression dependencies ([str]) not in source plan output ([filter_4])
	at com.google.common.base.Preconditions.checkArgument(Preconditions.java:157)
	at com.facebook.presto.sql.planner.sanity.ValidateDependenciesChecker.checkDependencies(ValidateDependenciesChecker.java:614)
	at com.facebook.presto.sql.planner.sanity.ValidateDependenciesChecker.access$100(ValidateDependenciesChecker.java:79)
	at com.facebook.presto.sql.planner.sanity.ValidateDependenciesChecker$Visitor.visitProject(ValidateDependenciesChecker.java:250)
	at com.facebook.presto.sql.planner.sanity.ValidateDependenciesChecker$Visitor.visitProject(ValidateDependenciesChecker.java:93)
	at com.facebook.presto.sql.planner.plan.ProjectNode.accept(ProjectNode.java:92)
	at com.facebook.presto.sql.planner.sanity.ValidateDependenciesChecker$Visitor.visitExchange(ValidateDependenciesChecker.java:466)
	at com.facebook.presto.sql.planner.sanity.ValidateDependenciesChecker$Visitor.visitExchange(ValidateDependenciesChecker.java:93)
	at com.facebook.presto.sql.planner.plan.ExchangeNode.accept(ExchangeNode.java:243)
	at com.facebook.presto.sql.planner.sanity.ValidateDependenciesChecker$Visitor.visitOutput(ValidateDependenciesChecker.java:295)
	at com.facebook.presto.sql.planner.sanity.ValidateDependenciesChecker$Visitor.visitOutput(ValidateDependenciesChecker.java:93)
	at com.facebook.presto.sql.planner.plan.OutputNode.accept(OutputNode.java:82)
	at com.facebook.presto.sql.planner.sanity.ValidateDependenciesChecker.validate(ValidateDependenciesChecker.java:90)
	at com.facebook.presto.sql.planner.sanity.ValidateDependenciesChecker.validate(ValidateDependenciesChecker.java:85)
	at com.facebook.presto.sql.planner.sanity.PlanSanityChecker.lambda$validateFinalPlan$0(PlanSanityChecker.java:60)
	at com.google.common.collect.ImmutableList.forEach(ImmutableList.java:407)
	at com.facebook.presto.sql.planner.sanity.PlanSanityChecker.validateFinalPlan(PlanSanityChecker.java:60)
	at com.facebook.presto.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:157)
	at com.facebook.presto.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:139)
	at com.facebook.presto.execution.SqlQueryExecution.doAnalyzeQuery(SqlQueryExecution.java:360)
	at com.facebook.presto.execution.SqlQueryExecution.analyzeQuery(SqlQueryExecution.java:345)
	at com.facebook.presto.execution.SqlQueryExecution.start(SqlQueryExecution.java:289)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

I got this error using Hive connector. However, I can't reproduce the error using WITH...VALUES... statement.

@qqibrow
Copy link
Contributor Author

qqibrow commented Oct 26, 2018

@oneonestar
Sorry to reply late. Recently I refined our design that making each nested column a separate column handle.

In previous approach, All nested columns are treated as single column handle to the parquet reader. Although this way simplified the design and code structure, it has two major drawbacks:

  1. Lazy read cannot be fully utilized. for query like select msg.foo, msg.bar where msg.foo = 5. Ideally if one row group has no value satisfying msg.foo = 5, reader should skip reading msg.bar. This is not doable in current approach since msg.foo and msg.bar are treated as single column handle.

  2. nested predicate is hard to pushdown. Still, for query like select msg.foo, msg.bar where msg.foo = 5, we want push down msg.foo = 5 predicate to reader to prune any segments doesn't match.

The refined explain looks like:

explain select msg.workflow.uuid, msg.action.uuid from foo.bar where msg.workflow.uuid = 'abc' and msg.action.name = 'send_sms' limit 10;

before:

- Output[uuid, uuid] => [expr_7:varchar, expr_8:varchar]
        uuid := expr_7
        uuid := expr_8
    - Limit[10] => [expr_7:varchar, expr_8:varchar]
        - LocalExchange[SINGLE] () => expr_7:varchar, expr_8:varchar
            - RemoteExchange[GATHER] => expr_7:varchar, expr_8:varchar
                - LimitPartial[10] => [expr_7:varchar, expr_8:varchar]
                    - ScanFilterProject[table = hive:foo.bar, filterPredicate = ((""msg"".workflow.uuid = CAST('abc' AS varchar)) AND (""msg"".action.name = CAST('send_sms' AS varchar)))] => [expr_7:varchar, expr_8:varchar]
                            expr_7 := ""msg"".workflow.uuid
                            expr_8 := ""msg"".action.uuid
                            LAYOUT: foo.bar
                            msg := msg:struct<......>:12:REGULAR
                            datestr:string:-1:PARTITION_KEY
                                :: [[2000-05-23, 2050-10-26]]

after:

- Output[uuid, uuid] => [msg.workflow.uuid:varchar, msg.action.uuid:varchar]
        uuid := msg.workflow.uuid
        uuid := msg.action.uuid
    - Limit[10] => [msg.workflow.uuid:varchar, msg.action.uuid:varchar]
        - LocalExchange[SINGLE] () => msg.workflow.uuid:varchar, msg.action.uuid:varchar              
            - RemoteExchange[GATHER] => msg.workflow.uuid:varchar, msg.action.uuid:varchar                
                - LimitPartial[10] => [msg.workflow.uuid:varchar, msg.action.uuid:varchar]                       
                    - ScanFilterProject[table = hive:foo.bar, filterPredicate = ((""msg.action.name"" = CAST('send_sms' AS varchar)) AND (""msg.workflow.uuid"" = CAST('abc' AS varchar)))] => [msg.workflow.uuid:varchar, msg.action.uuid:varchar]
                            LAYOUT: foo.bar
                            msg.action.uuid := msg.action.uuid:string:12:REGULAR
                            msg.workflow.uuid := msg.workflow.uuid:string:12:REGULAR
                            msg.action.name := msg.action.name:string:12:REGULAR
                            datestr:string:-1:PARTITION_KEY
                                :: [[2000-05-23, 2050-10-26]]

Must of the code are done, now I am intensively testing the new version. Hopefully I can send diff by end of next week (might miss some unit test but functionality ready)

Add PushDownDereferenceExpression to pushdown dereferences right above TableScan
Add MergeNestedColumn to convert valid dereference into ColumnHandle in
TableScan
Add NestedColumn into HiveColumnHandle
Change in ParquetReader to use NestedColumn in file reading
@qqibrow
Copy link
Contributor Author

qqibrow commented Nov 17, 2018

all functions are done.

@qqibrow
Copy link
Contributor Author

qqibrow commented Dec 3, 2018

@martint could you take a look at the overall code structure? thanks!

@qqibrow
Copy link
Contributor Author

qqibrow commented Jan 15, 2019

@martint @dain @electrum Could any of you check this diff and provide some feedback? really want to push it further :)

@mattsfuller
Copy link
Contributor

Hi @qqibrow -- if you resubmit the PR to https://github.com/prestosql/presto we'd be happy to take a look and move it forward.

@mbasmanova
Copy link
Contributor

@qqibrow How ready is this PR? Can it be rebased to resolve conflicts? Is it possible to extent this to support subscripts for arrays and maps, e.g. SELECT a[1], a[3] FROM t WHERE a[1] = 10?

@mbasmanova
Copy link
Contributor

I assume #13180 superceds this one, hence, closing.

@mbasmanova mbasmanova closed this Aug 27, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants