Support shuffle on Hive partition columns before write#14010
Support shuffle on Hive partition columns before write#14010wenleix merged 2 commits intoprestodb:masterfrom
Conversation
|
Supersedes #13969 |
|
cc @mbasmanova , @kaikalur , @aweisberg |
|
CC: @biswapesh |
There was a problem hiding this comment.
!isShufflePartitionedColumnsForTableWriteEnabled(session) || table.getPartitionColumns().isEmpty()?
|
@highker : Wondering if you can take a look at the SPI change? |
|
Thanks @highker for the review. Comments addressed 😃 |
There was a problem hiding this comment.
@wenleix I don't understand this TODO. It seems to me that bucketed tables are handled above and would never reach that code. Would you clarify?
There was a problem hiding this comment.
@mbasmanova : Sorry for the confusion, I mean we don't have to use HivePartitionHandle for the shuffle partitioning. Thinking about implementing a different HiveShufflePartitioningHandle that distribute the keys more uniformly (Hive bucket function is not that great :) )
mbasmanova
left a comment
There was a problem hiding this comment.
@wenleix I'd like to getter a better understanding of the effects of this change. With shuffle_partitioned_columns_for_table_write_enabled=true, the data will be shuffles on partition columns and TableWriter operator will run on as many nodes are there are available in the cluster, each node processing distinct set of partitions. E.g. all data for a single partition will be written by the same node. That node may also write few other partitions. Within the node, there will be at least 4 threads, but could be more if the node writes data for multiple partitions, up to 100 (TBD: config name). Hence, this properly allows to write up to #-nodes x 100 partitions in a single query and avoid making very small files. Is this accurate?
|
@mbasmanova : Thanks @mbasmanova for the review!
Correct.
It's configurable via
That's accurate. Although I am only thinking write to at most a few thousands partitions in practice 😃 |
|
@wenleix Thanks for explaining. Sounds great! Can't wait to give it a try. |
d43d4ce to
df27916
Compare
Previously, writing worker will receive rows in all partitions, and thus can write upper to hive.max-partitions-per-writers partitions. This session property allows shuffle on partitioned columns when writing to partitioned unbucketed Hive tables. As a result, rows in the same partition will be sent to the same writing worker. This increase the number of maximum partitions written in single query by a factor of number of total writing workers.
Previously, writing worker will receive rows in all partitions,
and thus can write upper to hive.max-partitions-per-writers partitions.
This session property allows shuffle on partitioned columns when writing
to partitioned unbucketed Hive tables. As a result, rows in the same
partition will be sent to the same writing worker. This increase the
number of maximum partitions written in single query by a factor of
number of total writing workers.