Skip to content

Dereference Projection Pushdown in Query Plan#2672

Merged
martint merged 5 commits intotrinodb:masterfrom
phd3:deref-pushdown-in-plan
May 21, 2020
Merged

Dereference Projection Pushdown in Query Plan#2672
martint merged 5 commits intotrinodb:masterfrom
phd3:deref-pushdown-in-plan

Conversation

@phd3
Copy link
Copy Markdown
Member

@phd3 phd3 commented Jan 29, 2020

supercedes #1435

@cla-bot cla-bot bot added the cla-signed label Jan 29, 2020
@phd3
Copy link
Copy Markdown
Member Author

phd3 commented Jan 29, 2020

(1)

@martint made a suggestion that the dereference rules should go into the iterative optimizer where most of the rules are. Since dereference pushdown is also projection pushdown, I moved the rules to be a part of projectionPushdownRules. This causes a couple of TestPushdownDereferences tests to fail.

The root cause for failures is: After moving dereference pushdown to projectionPushDownRules, predicate pushdown does not follow dereference pushdown.

For a query like

WITH t(msg) AS (VALUES ROW(CAST(ROW(1, 2.0) AS ROW(x BIGINT, y DOUBLE))))
SELECT a.msg.y FROM t a WHERE a.msg.x = 7 
  1. Plan with Pushdown dereference expression in query plan #1435:
Output[y]
│   Layout: [expr_6:double]
│   y := expr_6
└─ FilterProject[filterPredicate = ("field".x = BIGINT '7')]
   │   Layout: [expr_6:double]
   │   expr_6 := "field".y
   └─ Values
          Layout: [field:row(x bigint, y double)]
  1. Plan now:
Output[y]
│   Layout: [expr_21:double]
│   y := expr_21
└─ FilterProject[filterPredicate = ("expr_22" = BIGINT '7')]
   │   Layout: [expr_21:double]
   └─ Project[]
      │   Layout: [expr_21:double, expr_22:bigint]
      │   expr_21 := "field".y
      │   expr_22 := "field".x
      └─ Values
             Layout: [field:row(x bigint, y double)]
  • I think that (1) is more optimal than (2). i.e. if PushProjectionIntoTableScan is not able to push the dereferences down to the connector, PredicatePushdown should push down the predicates again in the plan.
  • This should be achievable by either with the ordering projection pushdown in the plan --> PushProjectionIntoTableScan --> PredicatePushdown. i.e. PredicatePushdown is ALWAYS executed after projection pushdown. I'm not sure if doing this will impact other plans. @martint what are your thoughts on this?

(2)

Consider the following input for PushdownDereferencesThroughProject class.

Input:

Project(P1):  [a := b.x]
  |______ Project(P2) [b:=c]
                |______Some node(N): [c] 

If P1 is using dereference expression on a symbol that is output from N (and not synthesized within P2), then the dereference expression should get pushed down
For this to work effectively, UnaliasSymbolReferences rule should also run in the same optimizer as the pushdown rules. Because there can be cases where projections like [b := c] are present in P2.

(3)
I think we can simplify the PushDownDereferenceThroughSemiJoin#apply. Since SemiJoinNode::getOutputSymbols does not include symbols from FilteringSource, we need to create a ProjectNode ONLY above the Source, I believe.

(4)
ExtractDereferenceFromFilter can cause a ProjectNode with identity projections, RemoveIdentityProjections should always run after it. What do you think?

@martint
Copy link
Copy Markdown
Member

martint commented Jan 31, 2020

For 1), it's certainly possible that plans will be affected, but adding instances of PredicatePushdown should be benign.

For 2) can you clarify what the actual shape of the plan is? The way it's written would seem to indicate that's P1(P2(N)), but then the symbol references are broken.

For 3) yes, that's correct.

For 4), that seems reasonable. Make sure to add it to the same IterativeOptimizer, which will take care of running them if necessary.

@phd3
Copy link
Copy Markdown
Member Author

phd3 commented Feb 3, 2020

@martint thanks. I've fixed the mixup in the example (2). please let me know if it's still confusing.

EDIT: just saw that UnAliasSymbolReferences is a plan optimizer, so it cannot run "with" projection pushdown rules :/

@martint
Copy link
Copy Markdown
Member

martint commented Feb 4, 2020

Thanks. That makes sense now.

Unfortunately, UnaliasSymbolReferences is not implemented as a Rule (it's tricky to do so -- my plan is to eventually get rid of it anyway). But we might want to extend PushDereferenceThroughProject to be able to map identity renaming projections. That should handle this scenario without require the Unalias optimizer to run in between.

@phd3
Copy link
Copy Markdown
Member Author

phd3 commented Feb 7, 2020

Significant changes:

  • The patch now handles cases with longer dereference chains. Issue raised here works now: Pushdown dereference expression in query plan #1435 (comment) . If "a.b.c" and "a.b" are both being referenced in the projection, "a.b" is pushed down. The alternative is to push both of them down, but that doesn't seem desired since it results in replication of data down the query plan.

  • PushDownDereferenceThroughFilter is implemented, which pushes down dereferences from a ProjectNode on top of a FilterNode. It also considers dereferences being referenced inside the predicate of the FilterNode. (this merges the rules ExtractDereferenceFromFilter and PushdownDereferenceThrough(FilterNode.class, typeAnalyzer), which are now removed)

  • ExtractDereferenceFromJoin is dropped, and its working is incorporated in PushDownDereferenceThroughJoin.

  • A new ExtractDereferenceFromFilterAboveTableScan rule is implemented, to provide all dereferences on top of the table scan. This enables pushing down the projections into table scan, and then predicates on the virtual columns.

  • Add predicate pushdown optimizer invocations after projection pushdown

@phd3
Copy link
Copy Markdown
Member Author

phd3 commented Feb 19, 2020

@losipiuk can you please help review the commit 33c1739 ?

@kokosing
Copy link
Copy Markdown
Member

@losipiuk can you please help review the commit 33c1739 ?

@phd3 Unfortunately, Łukasz is no longer actively contributing to Presto.

@JamesRTaylor
Copy link
Copy Markdown

With #1720 and #2672 applied on top of 331, I get the exception below when running a query like this:

WITH ALL_OPTIONS AS (
  SELECT ds AS date_trunc, 
         request_state.r,
         COUNT(1) AS total_svr_cnt
    FROM my_data
   WHERE ds >= '2019-12-01' and ds < '2019-12-07'
     AND type = 'c'
     AND contains(type_features,'supports_x')
     AND end_state.timestamp IS NOT NULL
   GROUP BY 1, 2
), 

COALESCED_OPTIONS AS (
    SELECT date_trunc, 
           r, 
           COUNT(1) AS coalesced_data
      FROM (SELECT w.ds AS date_trunc,
                   a.r, 
                   tw.point.job_id
              FROM events.event_w w
              JOIN event_q a 
                ON w.dispatch_option_id = a.dispatch_option_id 
               AND w.ds >= '2019-12-01' and w.ds < '2019-12-07'
               and a.ds >= '2019-12-01' and a.ds < '2019-12-07'
             CROSS JOIN UNNEST(w.plan.timed_data) AS tw
             WHERE tw.point.best_alternate.option.timing.sec > 0
               AND (tw.point.best_alternate.option.supply_arrival_timing.method = 'a'
                    OR 
                    tw.point.best_alternate.option.supply_departure_timing.method = 'a')
              GROUP BY 1, 2, 3
            ) 
     GROUP BY 1, 2
)

SELECT a.date_trunc, 
       a.r,
       a.total_svr_cnt, 
       c.coalesced_data, 
       ROUND(1.0 * c.coalesced_data / a.total_svr_cnt, 3) AS pct_coalesced
  FROM ALL_OPTIONS a 
  JOIN COALESCED_OPTIONS c 
    ON a.date_trunc = c.date_trunc
   AND a.r = c.r 
 ORDER BY 1, 2
java.lang.IllegalStateException: Loaded block positions count (13) doesn't match lazy block positions count (16)
	at io.prestosql.spi.block.LazyBlock$LazyData.load(LazyBlock.java:383)
	at io.prestosql.spi.block.LazyBlock$LazyData.getBlock(LazyBlock.java:354)
	at io.prestosql.spi.block.LazyBlock.getBlock(LazyBlock.java:264)
	at io.prestosql.spi.block.LazyBlock.isNull(LazyBlock.java:253)
	at io.prestosql.$gen.PageFilter_20200327_050345_52.filter(Unknown Source)
	at io.prestosql.$gen.PageFilter_20200327_050345_52.filter(Unknown Source)
	at io.prestosql.operator.project.PageProcessor.createWorkProcessor(PageProcessor.java:121)
	at io.prestosql.operator.ScanFilterAndProjectOperator$SplitToPages.lambda$processPageSource$1(ScanFilterAndProjectOperator.java:269)

Any ideas, @phd3 ?

@JamesRTaylor
Copy link
Copy Markdown

I've tried to find the simplest query where this is occurring and I found this one:

 SELECT MAX(my_col)
 FROM hive.my_schema.my_table
 WHERE ds>='2020-01-01' and ds <= '2020-01-16';

If the number of partitions is less, the error doesn't occur.

@JamesRTaylor
Copy link
Copy Markdown

JamesRTaylor commented Mar 31, 2020

Here's a simplified version of the original query that repros the issue:

SELECT ds
FROM my_data
WHERE ds = '2020-03-01'
AND end_state.timestamp IS NOT NULL

It's not reproducible without the IS NOT NULL check. It's also not data related - I can reproduce it with any partition.

Copy link
Copy Markdown
Member

@martint martint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments.

Comment on lines 530 to 560
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment explaining this is here for that reason, so that when we revisit things in the future we know why we added it

Comment on lines 620 to 656
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Copy link
Copy Markdown

@JamesRTaylor JamesRTaylor Apr 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing call to RemoveUnsupportedDynamicFilters here is causing this query to be 4x slower:

                     new RemoveUnsupportedDynamicFilters(metadata),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon closer examination, performance is the same without the RemoveUnsupportedDynamicFilters. Sorry for the noise.

@phd3 phd3 added the WIP label Apr 13, 2020
@phd3 phd3 force-pushed the deref-pushdown-in-plan branch from 1fefbb8 to 0224dfb Compare April 13, 2020 23:40
JamesRTaylor pushed a commit to lyft/presto that referenced this pull request Apr 14, 2020
@phd3 phd3 force-pushed the deref-pushdown-in-plan branch 2 times, most recently from d1d0e2f to 7d8e3f0 Compare April 15, 2020 06:12
@phd3
Copy link
Copy Markdown
Member Author

phd3 commented Apr 15, 2020

@martint

I think the following optimizers should be invoked (in order) for complete pushdown:

(1) PushProjectionThroughX (plan projection pushdown)
(2) PushProjectionIntoTableScan
(3) PredicatePushdown
(4) PushPredicateIntoTableScan

(1) and (2) are included in projectionPushdown optimizer. I'm confused about this: should (3) and (4) be invoked every time (1) and (2) are invoked?

In the first pass of this PR, I added (3), but not (4) since the Hive pushdown wasn't implemented yet. After adding PredicatePushdown optimizer invocations, I had to add simplifyOptimizer to fix some failing tests (as per the advice here: https://github.com/prestosql/presto/blob/master/presto-main/src/main/java/io/prestosql/sql/planner/PlanOptimizers.java#L587) When predicate pushdown was invoked with dynamic filtering, had to add removeUnsupportedDynamicFilters optimizer. But I'm not sure this is the right approach to follow, since we go down the rabbit hole of sequencing and duplicating optimizer invocations. Would appreciate your guidance here.

I've copied tests from TestDereferencePushdown to TestHiveProjectionPushdownIntoTableScan, to verify expected end-to-end projection+predicate pushdown with hive. I think they'll be good to have on the PR to verify optimizer ordering. But we can skip checking them in since there is redundancy. (I can create a separate PR to add only some key representative tests to TestHiveProjectionPushdownIntoTableScan with predicates.)

@phd3
Copy link
Copy Markdown
Member Author

phd3 commented Apr 17, 2020

@martint

If there's a constraint in the filter node that the connector can "enforce", PushPredicateIntoTableScan modifies the plan to incorporate the new scan and residue filter.

In case of non-partition pushdown for hive, this is not true. The "compactEffectivePredicate" added to the table handle may or may not be satisfied by the HivePageSource (eg. ORC vs Avro). So we cannot get rid of those predicates from the FilterNode. But even though HiveMetadata::applyFilter returns a ConstraintApplicationResult, PushPredicateIntoTableScan throws it away, since arePlansSame doesn't consider change in table handles, if the enforced constraint hasn't changed.

Predicates on top level columns still get pushed down to ORC/Parquet, because AddExchanges::visitFilter adds these predicates, but I think this is just working coincidently. Please feel free to correct me here. This doesn't work for nested columns likely due to optimizer invocation ordering.

For this reason, I added connector table handle comparison in arePlansSame. (That failed some tests initially due to the fact that BucketFilter equality was object reference based, now they pass). But I'm not sure if comparing connector table handles is okay, or we need a better approach to tell that there was "some level" of predicate pushdown, even if it's not guaranteed to be enforced. What do you think? I can move this to a separate PR if you think that'd be better.

Edit: Extracted this to #3470

@phd3 phd3 force-pushed the deref-pushdown-in-plan branch from e03c41a to 5259429 Compare April 22, 2020 04:49
@phd3 phd3 requested a review from martint April 22, 2020 18:16
@phd3 phd3 force-pushed the deref-pushdown-in-plan branch 3 times, most recently from 6258d4b to 3c54f93 Compare April 29, 2020 05:21
Comment on lines 530 to 560
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment explaining this is here for that reason, so that when we revisit things in the future we know why we added it

Comment on lines 620 to 656
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Comment on lines 61 to 69
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't do this generically. The meaning of how the outputs of a node map to its inputs is node-specific, and we can't make inferences based on the fact that symbols are named the same way. It is legal for symbols to be named the same as long as they have the same type, even if they don't represent the same concept.

Also, for nodes such as Union, there's an internal remapping of inputs to outputs that changes the symbol names associated with they columns, so this would fail to identify them.

Additionally, this won't work for nodes such as Apply or SemiJoin. "Source" generally just means "an input that the operation consumes to achieve its result". In the case of Apply or Semijoin, the right side can be thought of as a subquery that gets applied for every row on the left side. In the case of Apply, there's some internal reduction operation that's done on top of the results of the subquery (semantically speaking). All this is to say that we can't generically map the dereferences in the project to the "sources" of a node -- it has to be done case-by-case.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @martint, that makes sense. This has been incorporated now by adding separate rules for every node, rather than trying to overfit a generic pattern. I think this suggestion has also helped with code readability and reasoning.

All the rules now follow the following algorithm at a high level:

ProjectNode P
    Node N
        sources S
  • Extract dereference projections (and create new symbols) from E1 U E2 where (1) E1: projection assignment expressions and (2) E2: set of expressions being used in the node N. (eg predicate in FilterNode or function expressions in WindowNode). The logic for extraction is DereferencePushdown::validDereferences.

  • Exclude those dereference expressions, for which, the base is used as-is in the node N itself. Say the remaining dereferences are E'.

  • Pushdown E' by creating project nodes between N and S. Rewrite assignments in P and expressions in N to replace dereference expressions with new symbols.

@phd3 phd3 added the WIP label May 2, 2020
@phd3 phd3 force-pushed the deref-pushdown-in-plan branch from b35bcf5 to 86e5d86 Compare May 6, 2020 18:05
@phd3 phd3 removed the WIP label May 6, 2020
@phd3 phd3 requested a review from martint May 6, 2020 20:37
@phd3 phd3 force-pushed the deref-pushdown-in-plan branch from 86e5d86 to b163819 Compare May 19, 2020 03:15
@phd3
Copy link
Copy Markdown
Member Author

phd3 commented May 19, 2020

PushLimitThroughProject and PushDereferenceThroughLimit rules undo each other's work, so I've modified both of them to not fire when there're only dereferences. It should work the following way:

Initial Plan:

Limit
	Project (x := f(a.b) + g(c.d), y := h(c))
  1. PushLimitThroughProject
Project (x := f(a.b) + g(c.d), y := h(c))
	Limit
  1. PushDereferencesThroughLimit
Project (x := f(z) + g(c.d), y := h(c))
	Limit
		Project(z := a.b, c)
  1. PushLimitThroughProject --> no effect.

Another option is to actually split the projectNode in PushLimitThroughProject into two project nodes, where the lower one only has dereferences. The rule pushes limit only through the first node. But I feel the current approach is simpler and has the same overall effect.

PushLimitThroughProject avoids limit pushdown only when the dereferences are "exclusive", so that limit goes below overlapping dereferences. For example, we still push the limit in the following case. I think it is more optimal than not pushing the limit at all, since PushDereferenceThroughLimit will push the sufficient dereferences down.

Limit (2)
	Project (x := a.b.c, y := a.b) 
  1. PushLimitThroughProject
Project (x := a.b.c, y := a.b)
	Limit (2)
  1. PushDereferenceThroughLimit
Project (x := a_b.c, y := a_b)
	Limit(2)
		Project(a_b := a.b, x, y)
  1. PushLimitThroughProject --> no effect.

@phd3 phd3 force-pushed the deref-pushdown-in-plan branch 2 times, most recently from a590279 to e37684b Compare May 19, 2020 20:56
@phd3 phd3 force-pushed the deref-pushdown-in-plan branch from e37684b to ff420b3 Compare May 21, 2020 02:46
@martint martint self-assigned this May 21, 2020
@phd3 phd3 force-pushed the deref-pushdown-in-plan branch from ff420b3 to f95f644 Compare May 21, 2020 16:42
@martint
Copy link
Copy Markdown
Member

martint commented May 21, 2020

Oracle failure is unrelated.

@martint martint merged commit 5dedde4 into trinodb:master May 21, 2020
@martint martint added this to the 334 milestone May 21, 2020
@martint martint mentioned this pull request May 22, 2020
8 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

5 participants