Skip to content

Push DictionaryBlock through remote partitioned exchange#14937

Merged
raunaqmorarka merged 2 commits intotrinodb:masterfrom
starburstdata:ls/051-poo-dictionary-support
Apr 18, 2023
Merged

Push DictionaryBlock through remote partitioned exchange#14937
raunaqmorarka merged 2 commits intotrinodb:masterfrom
starburstdata:ls/051-poo-dictionary-support

Conversation

@lukasz-stec
Copy link
Member

@lukasz-stec lukasz-stec commented Nov 7, 2022

Description

Dictionary-encoded blocks are currently flatened by the partitioned exchange operator. This prevents dictionary based optimizations from taking advantage of the encoded blocks (or results in additional overhead).
This PR adds support for dictionary encoded blocks through partitioned exchange for a case where the same dictionary (the same java object) is used by subsequent Blocks sent through PartitionedOutputOperator

Other than possible CPU optimizations, transmitting DictionaryBlocks over network reduces is more efficient than flat blocks.
As an example for query on tpch schema (sf10) encoded in orc files

explain analyze verbose select mktsegment from customer c, nation n where c.nationkey = n.nationkey;

mktsegment is dictionary encoded.

so when we look at the customer table scan we see

 Fragment 2 [SOURCE]                                                                                                                                                                                                     
     CPU: 308.06ms, Scheduled: 368.72ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 1500000 rows (18.62MB); per task: avg.: 500000.00 std.dev.: 35568.30, Output: 1500000 rows (45.78MB)                     
     Output buffer active time: 385.00ms, buffer utilization distribution (%): {p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=0.00, p75=0.01, p90=2.97, p95=3.04, p99=3.10, max=6.15}                                      
     Output layout: [nationkey, mktsegment, $hashvalue_6]                                                                                                                                                                
     Output partitioning: HASH [nationkey][$hashvalue_6]                                                                                                                                                                 
     ScanFilterProject[table = hive:tpch:customer, dynamicFilters = {"nationkey" = #df_332}]                                                                                                                             
         Layout: [nationkey:bigint, mktsegment:varchar(10), $hashvalue_6:bigint]                                                                                                                                         
         Estimates: {rows: 1500000 (45.78MB), cpu: 32.90M, memory: 0B, network: 0B}/{rows: 1500000 (45.78MB), cpu: 32.90M, memory: 0B, network: 0B}/{rows: 1500000 (45.78MB), cpu: 45.78M, memory: 0B, network: 0B}      
         CPU: 309.00ms (59.54%), Scheduled: 368.00ms (60.83%), Blocked: 0.00ns (0.00%), Output: 1500000 rows (45.78MB)                                                                                                   
         connector metrics:                                                                                                                                                                                              
           'OrcReaderCompressionFormat_ZLIB' = LongCount{total=75868244}                                                                                                                                                 
           'Physical input read time' = {duration=5.19ms}                                                                                                                                                                
         metrics:                                                                                                                                                                                                        
           'Blocked time distribution (s)' = {count=3.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=0.00, p75=0.00, p90=0.00, p95=0.00, p99=0.00, min=0.00, max=0.00}                                                  
           'CPU time distribution (s)' = {count=3.00, p01=0.06, p05=0.06, p10=0.06, p25=0.06, p50=0.06, p75=0.07, p90=0.07, p95=0.07, p99=0.07, min=0.06, max=0.07}                                                      
           'Input block types' = {}                                                                                                                                                                                      
           'Input rows distribution' = {count=3.00, p01=467180.00, p05=467180.00, p10=467180.00, p25=467180.00, p50=483398.00, p75=549422.00, p90=549422.00, p95=549422.00, p99=549422.00, min=467180.00, max=549422.00} 
           'Output block types' = {DictionaryBlock=LongCount{total=312}, LongArrayBlock=LongCount{total=922}, VariableWidthBlock=LongCount{total=149}}                                                                   
           'Projection CPU time' = {duration=612.80us}                                                                                                                                                                   
           'Scheduled time distribution (s)' = {count=3.00, p01=0.07, p05=0.07, p10=0.07, p25=0.07, p50=0.07, p75=0.07, p90=0.07, p95=0.07, p99=0.07, min=0.07, max=0.07}                                                
         Input avg.: 500000.00 rows, Input std.dev.: 7.11%                                                                                                                                                               
         $hashvalue_6 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("nationkey"), 0))                                                                                                                       
         nationkey := nationkey:bigint:REGULAR                                                                                                                                                                           
         mktsegment := mktsegment:varchar(10):REGULAR                                                                                                                                                                    
         Input: 1500000 rows (18.62MB), Filtered: 0.00%                                                                                                                                                                  
         Dynamic filters:                                                                                                                                                                                                
             - df_332, [ SortedRangeSet[type=bigint, ranges=25, {[0], ..., [24]}] ], collection time=31.86ms    

and with this optimization

 Fragment 2 [SOURCE]                                                                                                                                                                                                     
     CPU: 294.82ms, Scheduled: 317.47ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 1500000 rows (18.62MB); per task: avg.: 500000.00 std.dev.: 35568.30, Output: 1500000 rows (34.43MB)                     
     Output buffer active time: 342.36ms, buffer utilization distribution (%): {p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=0.00, p75=0.06, p90=3.40, p95=3.58, p99=3.94, max=7.09}                                      
     Output layout: [nationkey, mktsegment, $hashvalue_6]                                                                                                                                                                
     Output partitioning: HASH [nationkey][$hashvalue_6]                                                                                                                                                                 
     ScanFilterProject[table = hive:tpch:customer, dynamicFilters = {"nationkey" = #df_332}]                                                                                                                             
         Layout: [nationkey:bigint, mktsegment:varchar(10), $hashvalue_6:bigint]                                                                                                                                         
         Estimates: {rows: 1500000 (45.78MB), cpu: 32.90M, memory: 0B, network: 0B}/{rows: 1500000 (45.78MB), cpu: 32.90M, memory: 0B, network: 0B}/{rows: 1500000 (45.78MB), cpu: 45.78M, memory: 0B, network: 0B}      
         CPU: 294.00ms (59.51%), Scheduled: 317.00ms (59.36%), Blocked: 0.00ns (0.00%), Output: 1500000 rows (34.43MB)                                                                                                   
         connector metrics:                                                                                                                                                                                              
           'OrcReaderCompressionFormat_ZLIB' = LongCount{total=75868244}                                                                                                                                                 
           'Physical input read time' = {duration=15.62ms}                                                                                                                                                               
         metrics:                                                                                                                                                                                                        
           'Blocked time distribution (s)' = {count=3.00, p01=0.00, p05=0.00, p10=0.00, p25=0.00, p50=0.00, p75=0.00, p90=0.00, p95=0.00, p99=0.00, min=0.00, max=0.00}                                                  
           'CPU time distribution (s)' = {count=3.00, p01=0.06, p05=0.06, p10=0.06, p25=0.06, p50=0.06, p75=0.06, p90=0.06, p95=0.06, p99=0.06, min=0.06, max=0.06}                                                      
           'Input block types' = {}                                                                                                                                                                                      
           'Input rows distribution' = {count=3.00, p01=467180.00, p05=467180.00, p10=467180.00, p25=467180.00, p50=483398.00, p75=549422.00, p90=549422.00, p95=549422.00, p99=549422.00, min=467180.00, max=549422.00} 
           'Output block types' = {DictionaryBlock=LongCount{total=312}, LongArrayBlock=LongCount{total=922}, VariableWidthBlock=LongCount{total=149}}                                                                   
           'Projection CPU time' = {duration=498.71us}                                                                                                                                                                   
           'Scheduled time distribution (s)' = {count=3.00, p01=0.06, p05=0.06, p10=0.06, p25=0.06, p50=0.06, p75=0.07, p90=0.07, p95=0.07, p99=0.07, min=0.06, max=0.07}                                                
         Input avg.: 500000.00 rows, Input std.dev.: 7.11%                                                                                                                                                               
         $hashvalue_6 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("nationkey"), 0))                                                                                                                       
         nationkey := nationkey:bigint:REGULAR                                                                                                                                                                           
         mktsegment := mktsegment:varchar(10):REGULAR                                                                                                                                                                    
         Input: 1500000 rows (18.62MB), Filtered: 0.00%                                                                                                                                                                  
         Dynamic filters:                                                                                                                                                                                                
             - df_332, [ SortedRangeSet[type=bigint, ranges=25, {[0], ..., [24]}] ], collection time=33.37ms 

so Output: 1500000 rows (45.78MB) goes down to Output: 1500000 rows (34.43MB).

Non-technical explanation

Increase dictionary-encoded block usage in the engine.

Release notes

( X) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Nov 7, 2022
@lukasz-stec lukasz-stec force-pushed the ls/051-poo-dictionary-support branch from 5124e8a to fea1f25 Compare November 8, 2022 20:37
@lukasz-stec lukasz-stec marked this pull request as ready for review November 14, 2022 16:44
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use IntArrayList?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to calculate getRetainedSizeInBytes. For that, the actual size of the array is needed and IntArrayList does not expose that info.

Copy link
Member Author

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CA

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to calculate getRetainedSizeInBytes. For that, the actual size of the array is needed and IntArrayList does not expose that info.

@lukasz-stec lukasz-stec requested a review from Dith3r November 15, 2022 09:21
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename this to DictionaryAwarePositionsAppender. We usually just have DictionaryAwareXXX which handles both RLE and dicts

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave the name unchanged. Unnesting is a more important function of this class. Handling dictionaries is an additional feature.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnesting is a more important function of this class.

Unnesting became almost irrelevant at this point since now RLE and dicts cannot be nested

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep state top-level as in io.trino.operator.output.RleAwarePositionsAppender

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this class has now two responsibilities (unnesting and building a dictionary) it's more readable to separate them. It also makes it easier to reset the state of the appender.

Copy link
Member

@sopel39 sopel39 Nov 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this class has now two responsibilities (unnesting and building a dictionary)

This class doesn't really unnest much now. RleAwayPositionsAppender could do:

        if (source instanceof RunLengthEncodedBlock) {
            delegate.appendRle(((RunLengthEncodedBlock) source).getValue(), positions.size());
        }

itself really at this point, so UnnestingPositionsAppender would be all about dictionaries

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RleAwayPositionsAppender is not always there. Unnesting part is actually to make sure only flat blocks are passed down to the flat appenders.
Maybe we should merge UnnestingPositionsAppender and RleAwayPositionsAppender into BlockTypeAwarePositionsAppender? although I fear it would make the code messier.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RleAwayPositionsAppender is not always there.

Why wouldn't it be always there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not needed if the type is not comparable

Copy link
Member

@sopel39 sopel39 Nov 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not needed if the type is not comparable

I don't think it's worth extra complexity since there are not many types like that.

However. You could then have minimal UnnestingPositionsAppender without rle or dict builder support.

I don't mixing of current UnnestingPositionsAppender with dictionary awareness is needed

Copy link
Member Author

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CA

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this class has now two responsibilities (unnesting and building a dictionary) it's more readable to separate them. It also makes it easier to reset the state of the appender.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave the name unchanged. Unnesting is a more important function of this class. Handling dictionaries is an additional feature.

@lukasz-stec lukasz-stec requested a review from sopel39 November 15, 2022 10:13
@lukasz-stec lukasz-stec force-pushed the ls/051-poo-dictionary-support branch from ba0e02c to a754634 Compare November 15, 2022 16:18
@lukasz-stec lukasz-stec marked this pull request as draft November 15, 2022 16:19
@lukasz-stec
Copy link
Member Author

Working of a performance regression so converted temporarily to a draft

@lukasz-stec lukasz-stec force-pushed the ls/051-poo-dictionary-support branch 9 times, most recently from a2e3297 to afe4bd5 Compare March 30, 2023 14:21
@lukasz-stec lukasz-stec force-pushed the ls/051-poo-dictionary-support branch 3 times, most recently from cbff50c to 41bf198 Compare April 3, 2023 15:16
@lukasz-stec lukasz-stec force-pushed the ls/051-poo-dictionary-support branch from 41bf198 to a731d5f Compare April 4, 2023 10:52
@lukasz-stec lukasz-stec marked this pull request as ready for review April 4, 2023 20:40
@lukasz-stec lukasz-stec force-pushed the ls/051-poo-dictionary-support branch from a731d5f to b4514ae Compare April 5, 2023 06:08
@lukasz-stec
Copy link
Member Author

rebased on the master with the OrcReader#MAX_BATCH_SIZE change

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is dictionary flushed in every case here but in the other append it is flushed only for non-dictionary cases

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we flush in every case other than we have the same dictionary (dictionaryBlockBuilder.canAppend to be precise).
We could also do it here as well at a cost of !closed && (dictionary == this.dictionary || this.dictionary == null) for every row. I will try to benchmark how much this impacts performance of the row by row processing

The DictionaryBlock is pushed through only
if all the input blocks are DictionaryBlocks and
use the same instance of the DictionaryBlock.dictionary.
This is to limit the negative impact of using dictionaries due to megamorphic calls
but still getting the benefit of transporting dictionary blocks over network
@lukasz-stec lukasz-stec force-pushed the ls/051-poo-dictionary-support branch from b4514ae to 478b4f5 Compare April 5, 2023 07:44
Copy link
Member Author

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comments answered and addressed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we flush in every case other than we have the same dictionary (dictionaryBlockBuilder.canAppend to be precise).
We could also do it here as well at a cost of !closed && (dictionary == this.dictionary || this.dictionary == null) for every row. I will try to benchmark how much this impacts performance of the row by row processing

@lukasz-stec
Copy link
Member Author

ci failed with fatal: unable to access 'https://github.com/airlift/jvmkill/': The requested URL returned error: 429 🤷

else {
newSize = initialEntryCount;
}
newSize = Math.max(newSize, capacity);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any possibility that newSize will be lower than capacity?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well yes, i.e., capacity can be bigger than initialEntryCount or bigger than 1.5 * dictionaryIds.length (calculateNewArraySize)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just do int newSize = calculateNewArraySize(max(initialEntryCount, capacity, dictionaryIds.length))

ofc this will work different that now. If capacity will 100, it will create an array of size 150.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not want to go over initialEntryCount as the appender can be "pre-sized" in #reset

@sopel39
Copy link
Member

sopel39 commented Apr 13, 2023

@lukasz-stec what are we waiting for here?

@lukasz-stec
Copy link
Member Author

stable benchmarks, mainly to see the impact of dictionary block flattening

@raunaqmorarka raunaqmorarka merged commit 53f0dc7 into trinodb:master Apr 18, 2023
@raunaqmorarka raunaqmorarka deleted the ls/051-poo-dictionary-support branch April 18, 2023 08:48
@github-actions github-actions bot added this to the 414 milestone Apr 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

4 participants