Skip to content

Add optimization to rewrite left join with array contains to equi-join#21420

Merged
feilong-liu merged 1 commit intoprestodb:masterfrom
feilong-liu:left_join_or
Dec 15, 2023
Merged

Add optimization to rewrite left join with array contains to equi-join#21420
feilong-liu merged 1 commit intoprestodb:masterfrom
feilong-liu:left_join_or

Conversation

@feilong-liu
Copy link
Contributor

@feilong-liu feilong-liu commented Nov 20, 2023

Description

Left join with array contains condition and no equi join condition result in a join with a filter and no equi join condition, and can only be executed with broadcast and not able to be partitioned. In this PR, I add an optimization to change the plan into a equi join.

Motivation and Context

To improve performance for left join with array contains condition.

Impact

Improve performance for left join with array contains condition.

Test Plan

Unit test and verifier suite

Contributor checklist

  • Please make sure your submission complies with our development, formatting, commit message, and attribution guidelines.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

General Changes
* Add an optimization to rewrite left join with array contains in join condition to equi join, the optimization is controlled by session property `rewrite_left_join_array_contains_to_equi_join `

@feilong-liu feilong-liu requested a review from a team as a code owner November 20, 2023 07:04
@feilong-liu feilong-liu marked this pull request as draft November 20, 2023 07:04
@feilong-liu feilong-liu marked this pull request as ready for review November 28, 2023 01:19
@vivek-bharathan vivek-bharathan self-requested a review November 28, 2023 05:17
Copy link
Contributor

@vivek-bharathan vivek-bharathan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me overall. Just one open question - do you think there is any benefit in making this optimization a cost-based decision? Maybe based on the cardinality of the array? Maybe based on HBO?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe remove nulls too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@aaneja
Copy link
Contributor

aaneja commented Nov 30, 2023

any benefit in making this optimization a cost-based decision? Maybe based on the cardinality of the array? Maybe based on HBO?

Some of the problems to accomplish this are -

  1. Stats are not propagated for a UnnestNode correctly, so cost-estimates for parents will not work anyhow
  2. We don't store stats for any nested types AFAIK, so estimating the array cardinality is not easy
  3. Function calls dont propogate stats - Better estimates needed for outputs of function calls #20513

@feilong-liu
A side-effect of not propogating stats is that the Join distribution type would be incorrectly chosen, see below example from the test case -

explain with t1 as (select * from (values (array[1, 2, 3], 10), (array[4, 5, 6], 11)) t(arr, k)), t2 as (select * from (values (1, 'a'), (4, 'b')) t(k, v)) select t1.k, t2.k, t2.v from t2 left join t1 on contains(t1.arr, t2.k);

Without optimization

A Broadcast/Replicated Join is chosen

 - Output[PlanNodeId 26][k, k, v] => [field_17:integer, field:integer, field_0:varchar(1)]                            
         Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 112.00, network: 0.00}                 
         k := field_17 (1:164)                                                                                        
         k := field (1:170)                                                                                           
         v := field_0 (1:176)                                                                                         
     - LeftJoin[PlanNodeId 21][contains(field_16, field)] => [field:integer, field_0:varchar(1), field_17:integer]    
             Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 112.00, network: 0.00}             
             Distribution: REPLICATED                                                                                 
         - LocalExchange[PlanNodeId 396][ROUND_ROBIN] () => [field:integer, field_0:varchar(1)]                       
                 Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 120.00, memory: 0.00, network: 0.00}   
             - Values[PlanNodeId 0] => [field:integer, field_0:varchar(1)]                                            
                     Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 0.00, memory: 0.00, network: 0.00} 
                     (INTEGER'1', VARCHAR'a')                                                                         
                     (INTEGER'4', VARCHAR'b')                                                                         
         - Values[PlanNodeId 9] => [field_16:array(integer), field_17:integer]                                        
                 Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 0.00, memory: 0.00, network: 0.00}     
                 ([Block: position count: 3; size: 68 bytes], INTEGER'10')                                            
                 ([Block: position count: 3; size: 68 bytes], INTEGER'11')```

With optimization

A Partitioned Join is chosen, even though the build side would be small enough to replicate

 - Output[PlanNodeId 26][k, k, v] => [field_17:integer, field:integer, field_0:varchar(1)]                                                            
         k := field_17 (1:164)                                                                                                                        
         k := field (1:170)                                                                                                                           
         v := field_0 (1:176)                                                                                                                         
     - RemoteStreamingExchange[PlanNodeId 446][GATHER] => [field_17:integer, field:integer, field_0:varchar(1)]                                       
         - RightJoin[PlanNodeId 297][("unnest" = "field")][$hashvalue, $hashvalue_47] => [field_17:integer, field:integer, field_0:varchar(1)]        
                 Distribution: PARTITIONED                                                                                                            
             - RemoteStreamingExchange[PlanNodeId 444][REPARTITION][$hashvalue] => [field_17:integer, unnest:integer, $hashvalue:bigint]              
                 - Project[PlanNodeId 529][projectLocality = LOCAL] => [field_17:integer, unnest:integer, $hashvalue_46:bigint]                       
                         $hashvalue_46 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(unnest), BIGINT'0'))                                   
                     - Unnest[PlanNodeId 296][replicate=field_17:integer, unnest=array_distinct:array(integer)] => [field_17:integer, unnest:integer] 
                         - Project[PlanNodeId 295][projectLocality = LOCAL] => [field_17:integer, array_distinct:array(integer)]                      
                                 Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 224.00, memory: 0.00, network: 0.00}                   
                                 array_distinct := array_distinct(field_16) (1:22)                                                                    
                             - LocalExchange[PlanNodeId 487][ROUND_ROBIN] () => [field_16:array(integer), field_17:integer]                           
                                     Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 112.00, memory: 0.00, network: 0.00}               
                                 - Values[PlanNodeId 9] => [field_16:array(integer), field_17:integer]                                                
                                         Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 0.00, memory: 0.00, network: 0.00}             
                                         ([Block: position count: 3; size: 68 bytes], INTEGER'10')                                                    
                                         ([Block: position count: 3; size: 68 bytes], INTEGER'11')                                                    
             - LocalExchange[PlanNodeId 488][HASH][$hashvalue_47] (field) => [field:integer, field_0:varchar(1), $hashvalue_47:bigint]                
                     Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 414.00, memory: 0.00, network: 138.00}                             
                 - RemoteStreamingExchange[PlanNodeId 445][REPARTITION][$hashvalue_48] => [field:integer, field_0:varchar(1), $hashvalue_48:bigint]   
                         Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 276.00, memory: 0.00, network: 138.00}                         
                     - Project[PlanNodeId 530][projectLocality = LOCAL] => [field:integer, field_0:varchar(1), $hashvalue_49:bigint]                  
                             Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 138.00, memory: 0.00, network: 0.00}                       
                             $hashvalue_49 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(field), BIGINT'0')) (1:121)                        
                         - Values[PlanNodeId 0] => [field:integer, field_0:varchar(1)]                                                                
                                 Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 0.00, memory: 0.00, network: 0.00}                     
                                 (INTEGER'1', VARCHAR'a')                                                                                             
                                 (INTEGER'4', VARCHAR'b')  

IMO, we should keep the default for this feature flag to false

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the join criteria need to be empty? if we have a query like this

r left join s on (a=b and contains(...))

wouldn't we benefit from applying the same transformation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this case, the two tables will be partitioned on a and b and is already an equi-join. The benefit of apply this transformation will be unknown.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps you can use Stream.findFirst to do this, something like this:

Optional<RowExpression> arrayContains = andConjuncts.stream().findFirst(...)
if (!arrayContains.isPresent()) ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using findFirst will lead to code like: andConjuncts.stream().filter(...).findFirst()?
The logic in the for loop is not one or two simple functions, I think it may be better to use a for loop here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, it requires a function with a lambda expression to be applied first but I think the function is the same few lines you have below (and can even be refactored into a function):

Optional<RowExpression> arrayContains = andConjuncts.stream().filter(conjunct->isSupportedArrayContains(conjunct)).findFirst();
if (!arrayContains.isPresent()) {
    remainingConjuncts = andConjuncts.stream().filter(x -> !x.equals(arrayContains.get()).collect(toImmutableList());
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are set at the same time so we probably don't need to check the disjunction.
Also, this may not be necessary if you rewrite it with the findFirst suggestion above

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Safer! So I don't mind

@feilong-liu feilong-liu force-pushed the left_join_or branch 2 times, most recently from e61e34e to e938638 Compare December 6, 2023 00:53
@feilong-liu
Copy link
Contributor Author

any benefit in making this optimization a cost-based decision? Maybe based on the cardinality of the array? Maybe based on HBO?

Some of the problems to accomplish this are -

  1. Stats are not propagated for a UnnestNode correctly, so cost-estimates for parents will not work anyhow
  2. We don't store stats for any nested types AFAIK, so estimating the array cardinality is not easy
  3. Function calls dont propogate stats - Better estimates needed for outputs of function calls #20513

@feilong-liu A side-effect of not propogating stats is that the Join distribution type would be incorrectly chosen, see below example from the test case -

explain with t1 as (select * from (values (array[1, 2, 3], 10), (array[4, 5, 6], 11)) t(arr, k)), t2 as (select * from (values (1, 'a'), (4, 'b')) t(k, v)) select t1.k, t2.k, t2.v from t2 left join t1 on contains(t1.arr, t2.k);

Without optimization

A Broadcast/Replicated Join is chosen

 - Output[PlanNodeId 26][k, k, v] => [field_17:integer, field:integer, field_0:varchar(1)]                            
         Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 112.00, network: 0.00}                 
         k := field_17 (1:164)                                                                                        
         k := field (1:170)                                                                                           
         v := field_0 (1:176)                                                                                         
     - LeftJoin[PlanNodeId 21][contains(field_16, field)] => [field:integer, field_0:varchar(1), field_17:integer]    
             Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 112.00, network: 0.00}             
             Distribution: REPLICATED                                                                                 
         - LocalExchange[PlanNodeId 396][ROUND_ROBIN] () => [field:integer, field_0:varchar(1)]                       
                 Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 120.00, memory: 0.00, network: 0.00}   
             - Values[PlanNodeId 0] => [field:integer, field_0:varchar(1)]                                            
                     Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 0.00, memory: 0.00, network: 0.00} 
                     (INTEGER'1', VARCHAR'a')                                                                         
                     (INTEGER'4', VARCHAR'b')                                                                         
         - Values[PlanNodeId 9] => [field_16:array(integer), field_17:integer]                                        
                 Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 0.00, memory: 0.00, network: 0.00}     
                 ([Block: position count: 3; size: 68 bytes], INTEGER'10')                                            
                 ([Block: position count: 3; size: 68 bytes], INTEGER'11')```

With optimization

A Partitioned Join is chosen, even though the build side would be small enough to replicate

 - Output[PlanNodeId 26][k, k, v] => [field_17:integer, field:integer, field_0:varchar(1)]                                                            
         k := field_17 (1:164)                                                                                                                        
         k := field (1:170)                                                                                                                           
         v := field_0 (1:176)                                                                                                                         
     - RemoteStreamingExchange[PlanNodeId 446][GATHER] => [field_17:integer, field:integer, field_0:varchar(1)]                                       
         - RightJoin[PlanNodeId 297][("unnest" = "field")][$hashvalue, $hashvalue_47] => [field_17:integer, field:integer, field_0:varchar(1)]        
                 Distribution: PARTITIONED                                                                                                            
             - RemoteStreamingExchange[PlanNodeId 444][REPARTITION][$hashvalue] => [field_17:integer, unnest:integer, $hashvalue:bigint]              
                 - Project[PlanNodeId 529][projectLocality = LOCAL] => [field_17:integer, unnest:integer, $hashvalue_46:bigint]                       
                         $hashvalue_46 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(unnest), BIGINT'0'))                                   
                     - Unnest[PlanNodeId 296][replicate=field_17:integer, unnest=array_distinct:array(integer)] => [field_17:integer, unnest:integer] 
                         - Project[PlanNodeId 295][projectLocality = LOCAL] => [field_17:integer, array_distinct:array(integer)]                      
                                 Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 224.00, memory: 0.00, network: 0.00}                   
                                 array_distinct := array_distinct(field_16) (1:22)                                                                    
                             - LocalExchange[PlanNodeId 487][ROUND_ROBIN] () => [field_16:array(integer), field_17:integer]                           
                                     Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 112.00, memory: 0.00, network: 0.00}               
                                 - Values[PlanNodeId 9] => [field_16:array(integer), field_17:integer]                                                
                                         Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 0.00, memory: 0.00, network: 0.00}             
                                         ([Block: position count: 3; size: 68 bytes], INTEGER'10')                                                    
                                         ([Block: position count: 3; size: 68 bytes], INTEGER'11')                                                    
             - LocalExchange[PlanNodeId 488][HASH][$hashvalue_47] (field) => [field:integer, field_0:varchar(1), $hashvalue_47:bigint]                
                     Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 414.00, memory: 0.00, network: 138.00}                             
                 - RemoteStreamingExchange[PlanNodeId 445][REPARTITION][$hashvalue_48] => [field:integer, field_0:varchar(1), $hashvalue_48:bigint]   
                         Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 276.00, memory: 0.00, network: 138.00}                         
                     - Project[PlanNodeId 530][projectLocality = LOCAL] => [field:integer, field_0:varchar(1), $hashvalue_49:bigint]                  
                             Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 138.00, memory: 0.00, network: 0.00}                       
                             $hashvalue_49 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(field), BIGINT'0')) (1:121)                        
                         - Values[PlanNodeId 0] => [field:integer, field_0:varchar(1)]                                                                
                                 Estimates: {source: CostBasedSourceInfo, rows: 2 (130B), cpu: 0.00, memory: 0.00, network: 0.00}                     
                                 (INTEGER'1', VARCHAR'a')                                                                                             
                                 (INTEGER'4', VARCHAR'b')  

IMO, we should keep the default for this feature flag to false

Agree that the UNNEST cardinality estimation is a problem which we need to solve. Hopefully HBO will be able to resolve most of it. I've made it default to false per suggestion.

@feilong-liu feilong-liu requested a review from mlyublena December 7, 2023 18:55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like we're doing a n^2 computation: scanning andConjuncts again in a for-loop on top of andConjuncts.
I know that the break below makes it linear and non-quadratic but I think it is confusing and less readable that way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@feilong-liu
Copy link
Contributor Author

This looks good to me overall. Just one open question - do you think there is any benefit in making this optimization a cost-based decision? Maybe based on the cardinality of the array? Maybe based on HBO?

Yeah, ideally a cost based decision is better. But most cases this optimization is beneficial, as it avoids doing a cartesian product for the join.
I changed the session property for this optimization to a enum instead of boolean, and we can add a cost_based option for it when we implement a cost based decision based on HBO later

@feilong-liu feilong-liu merged commit 87688fb into prestodb:master Dec 15, 2023
@feilong-liu feilong-liu deleted the left_join_or branch December 15, 2023 07:13
@wanglinsong wanglinsong mentioned this pull request Feb 12, 2024
64 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants