Support shuffle on Hive partition columns before write#13969
Support shuffle on Hive partition columns before write#13969wenleix wants to merge 1 commit intoprestodb:masterfrom
Conversation
|
cc @mbasmanova , @kaikalur , @aweisberg Note now it writes exactly file per partition, which might slow down the table write. We might want to introduce extra local round robin shuffle to increase number of file writers if necessary. |
| private static final String PARTITIONS_TABLE_SUFFIX = "$partitions"; | ||
| private static final String PRESTO_TEMPORARY_TABLE_NAME_PREFIX = "__presto_temporary_table_"; | ||
|
|
||
| private static final int SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE = 1009; |
There was a problem hiding this comment.
@wenleix Thanks for working on this. I have a few questions.
Note now it writes exactly file per partition, which might slow down the table write. We might want to introduce extra local round robin shuffle to increase number of file writers if necessary.
One file per partition might be too little. Is there a way to still use up to 100 writer threads per node to write files?
What's the significance of 1009? Is this the maximum number of dynamic partitions or something else?
How many nodes will be used for writing? hash_partition_count or some other number?
There was a problem hiding this comment.
I wonder if larger numbers of writer threads per nodes actually work and if it still works on T1. One case where you would enable this is when a query writes to a large number of partitions on T1s. In that case it can use up all the available memory for the ORC encoding buffers.
I think this is useful as is because it takes a query that doesn't run at all and makes it able to run. That is strictly better than it being unable to run on T1.
There was a problem hiding this comment.
Should the bucket count match the actual hash partition count so we don't have to map from a constant number of buckets to a different number of nodes actually executing the stage?
There was a problem hiding this comment.
Thanks @mbasmanova and @aweisberg for the comments!
One file per partition might be too little. Is there a way to still use up to 100 writer threads per node to write files?
In current approach it's difficult, as Presto still thinks it's the "table partitioning", thus it will do local exchange comply to table partitioning.
I do agree one file per partition is very inflexible. To solve this, we might want to differentiate between "table partitioning" and "write/shuffle partitioning" . For the later one, the local exchange can be a simple round robin. What do you think , @arhimondr
What's the significance of 1009? Is this the maximum number of dynamic partitions or something else?
I choose a prime number that is large enough (>1000). The reason is Hive bucket function is reported to be degenerated when the bucket column value has some pattern. -- I can cc you on the internal FB post.
How many nodes will be used for writing? hash_partition_count or some other number?
I think it will be max_tasks_per_stage but I can double check.
There was a problem hiding this comment.
I wonder if larger numbers of writer threads per nodes actually work and if it still works on T1. One case where you would enable this is when a query writes to a large number of partitions on T1s. In that case it can use up all the available memory for the ORC encoding buffers.
I agree. For T1 we probably wants to configure it to 1. But we might still want to have some flexibility in terms of how many files we can have per partition.
Should the bucket count match the actual hash partition count so we don't have to map from a constant number of buckets to a different number of nodes actually executing the stage?
No it doesn't have to. Bucket will be mapped to nodes in a random and "round robin" fashion. See NodePartitioningManager#createArbitraryBucketToNode :
|
CC: @biswapesh |
|
|
||
| Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(table); | ||
|
|
||
| if (!hiveBucketHandle.isPresent()) { |
There was a problem hiding this comment.
So this case is basically, it hasn't already been bucketed for us by the user, so we are going to pretend it's bucketed when writing out the files?
There was a problem hiding this comment.
Right. If the table is bucketed we have to follow the table bucketing. No other way round.
Now I rethink about it, maybe distinguish between table data partitioning and table write shuffle partitioning (or some other name) might actually make code easier to understand. Otherwise -- why it's actually not partitioned by XX but we pretend them to be partitioned by XX? . And this "if bucketed, use bucketing, otherwise, use partition column" can be a bit difficult to understand and maintain.
cc @arhimondr
| @@ -2396,9 +2416,31 @@ public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession sess | |||
| validatePartitionColumns(tableMetadata); | |||
There was a problem hiding this comment.
Is this for when they create the table at the same time as the select and we want to make sure we write it out as if it was bucketed? In other words it won't ever come back and call getInsertLayout before inserting?
aweisberg
left a comment
There was a problem hiding this comment.
Just had some questions about how this works in practice.
Also wondering about the hard coded 10009 bucket count.
I am interested in seeing this in action as we work on tuning the queries that hit this issue.
|
@wenleix Wenlei, thanks for explaining.
I like this proposal. |
|
Superseded by #14010 |
1 similar comment
|
Superseded by #14010 |
Please make sure your submission complies with our Development, Formatting, and Commit Message guidelines.
Fill in the release notes towards the bottom of the PR description.
See Release Notes Guidelines for details.