-
Notifications
You must be signed in to change notification settings - Fork 1.4k
feat: Support iceberg partition transform #15035
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support iceberg partition transform #15035
Conversation
✅ Deploy Preview for meta-velox canceled.
|
|
@mbasmanova Could you help to have a look at this PR, thank you. |
rui-mo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
| std::vector<HiveColumnHandlePtr> inputColumns, | ||
| LocationHandlePtr locationHandle, | ||
| dwio::common::FileFormat tableStorageFormat, | ||
| IcebergPartitionSpecPtr partitionSpec = nullptr, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a test to confirm that this spec controls the write directory as intended? It looks like the tests only check that the field parameters can be correctly set for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comment.
This PR just add the spec and it is not been actually used. So, we cannot add test to cover this scenario now.
In this PR https://github.com/facebookincubator/velox/pull/13874/files#diff-493592c0098c71afd3f75c22e49df793e1d32631dc51312d853a2fd4d139a9ec, I have added lots of test cases to cover the partition folder names. And I will split them into smaller PRs one by one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rui-mo Is this make sense to you? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added lots of test cases to cover the partition folder names.
Sounds reasonable to me to add the tests in follow-up PRs, thanks.
2d7689f to
e465df4
Compare
rui-mo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Looks good overall.
255954f to
97b4512
Compare
|
@mbasmanova Could you help have a look, @rui-mo has reviewed and approved the PR. |
| std::optional<common::CompressionKind> compressionKind = {}, | ||
| const std::unordered_map<std::string, std::string>& serdeParameters = {}); | ||
|
|
||
| IcebergPartitionSpecPtr partitionSpec() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use const& for return value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@PingLiuPing I'm looking at the first commit and somehow I don't see these old comments addressed. Would you double check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mbasmanova Oh, I made a mistake, I commit the fix in second commit. Now I combine 2 commits into a single commit.
| /// - kIdentity: Use the source value as-is (no transformation). | ||
| /// - kHour/kDay/kMonth/kYear: Extract time components from timestamps/dates. | ||
| /// - kBucket: Hash the value into N buckets for even distribution. | ||
| /// - kTruncate: Truncate strings/numbers to a specified width. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you move these comments next to enum values?
enum class TransformType {
/// Use the source value as-is (no transformation)
kIdentity,
/// Extract time components from timestamps/dates.
kHour,
kDay,
kMonth,
kYear,
/// Hash the value into N buckets for even distribution.
kBucket,
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
| /// represented by a named Field. | ||
| /// | ||
| /// The partition spec defines: | ||
| /// - Unique partition spec ID for versioning and evolution. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop "partition spec" : Unique ID for ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
| /// | ||
| /// The partition spec defines: | ||
| /// - Unique partition spec ID for versioning and evolution. | ||
| /// - Which source columns to use for partitioning. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this specified? Does Field.name identifies source column by name?
Should we add any santity checks to verify that fields is not empty and doesn't contain duplicates?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mbasmanova Thanks for the comment.
The partition field is defined by the upstream engine such as Presto and Spark.
For example in Presto
create table month_t1 (c_int int, c_date date, c_bigint bigint) with (format='PARQUET', partitioning=ARRAY['month(c_date)']);
In above DDL, it defines a partition transform month and the source column is c_date.
Such information will be processed by Iceberg java library, e.g. check the column type, column duplication etc.
And eventually these information will be passed into velox. Ideally, I want to use field ID to identify a source column and that's what the iceberg spec requires. But velox RowType only support matching field by field name so here I use the field name.
Should we add any santity checks to verify that fields is not empty and doesn't contain duplicates?
I think it is not necessary, such scenario has been processed by upstream iceberg library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But velox RowType only support matching field by field name so here I use the field name.
Got it. Would be nice to clarify in a comment.
I think it is not necessary, such scenario has been processed by upstream iceberg library.
Velox is a separate component and as such it needs to validate external inputs. It cannot simply trust that the caller provides valid input. Without proper validation it is hard to troubleshoot issues as it is not clear whether the bug is in the library (Velox) or the application (Spark).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, makes sense. I will add some logic to validate the inputs.
| columnHandles, | ||
| locationHandle, | ||
| fileFormat_, | ||
| nullptr, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you add argument name as a comment to improve readability?
/*foo=*/nullptr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will do.
| namespace facebook::velox::connector::hive::iceberg { | ||
| namespace { | ||
|
|
||
| class PartitionSpecTest : public ::testing::Test {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this class is not needed. Just use TEST instead of TEST_F.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, you are right.
| EXPECT_EQ("trunc", TransformTypeName::toName(TransformType::kTruncate)); | ||
| } | ||
|
|
||
| TEST_F(PartitionSpecTest, basic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we testing here? Seems redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will remove this case.
| EXPECT_FALSE(spec.fields[1].parameter.has_value()); | ||
| } | ||
|
|
||
| TEST_F(PartitionSpecTest, withParameters) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I want to test these two special transforms bucket and truncate that with additional parameter.
I think I will remove all these tests if I combine this PR with the actual partition transform.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is hard to see what the value of this particular test is. It seems to be verifying that IcebergPartitionSpec::Field's struct was initialized properly. But there is no logic to test... Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are correct. This test file is been added when I split the PR to smaller pieces. And it does not exist when I submit #13874. I will delete this test file when I combine the actual partition logic in this PR. It does not make sense to test the struct initialisation.
|
@PingLiuPing It looks like this PR just add a new struct, but doesn't use it. Is this correct? Might be better to combine this RP with the one that introduces functionality that uses this struct. |
@mbasmanova yes, in this PR I just add the basic data structures (PartitionSpec) that will be used later. The partition transform operations will be a huge PR, and I'm thinking to split it to several standalone PRs. |
Let's include this spec in the first PR. |
58c82c3 to
d2c3edf
Compare
@mbasmanova Thanks for your review comments. I have integrated the partition spec with Iceberg |
d2c3edf to
67dba5c
Compare
| /// @param serdeParameters Additional serialization/deserialization parameters | ||
| /// for the file format. | ||
| IcebergInsertTableHandle( | ||
| std::vector<HiveColumnHandlePtr> inputColumns, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IcebergInsertTableHandle takes HiveColumnHandlePtr as columns. HiveColumnHandle identifies which column is a partition key and which is not. It looks like in Iceberg this definition is not sufficient. Should we stop using HiveColumnHandle and introduce a separate IcebergColumnHandle? Otherwise, I assume we would have to document (and check) that kPartitionKey should never be used with Iceberg table columns.
class HiveColumnHandle : public ColumnHandle {
public:
/// NOTE: Make sure to update the mapping in columnTypeNames() when modifying
/// this.
enum class ColumnType {
kPartitionKey,
kRegular,
kSynthesized,
/// A zero-based row number of type BIGINT auto-generated by the connector.
/// Rows numbers are unique within a single file only.
kRowIndex,
kRowId,
};
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mbasmanova Thanks for the insightful comment.
So far with HiveColumnHandlePtr is still Ok. We only need to get which columns are partitionKey columns from HiveColumnHandlePtr. But we need to add IcebergInsertTableHandle in following PR (when collecting iceberg data file stats).
| // Convert a partition value to its string representation for use in | ||
| // partition directory path. The format follows the Iceberg specification | ||
| // for partition path encoding. | ||
| class IcebergPartitionUtil : public HivePartitionUtil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps, IcebergPartitionUtil -> IcebergPartitionPath
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
f6b3eb2 to
96a4c26
Compare
|
@mbasmanova Thank you for taking the time to review this PR. I’ve addressed your comments and rebased the branch. |
|
@PingLiuPing Thank you for iterating. GitHub is slow on this PR. Wondering if this is because there are lots of changes or comments or both. Wondering if there is a way to extract a smaller PRs from it. I'm still trying to understand the big picture. I'm reading this doc: https://iceberg.apache.org/docs/latest/partitioning/#icebergs-hidden-partitioning Let me know if there are better resources. My understanding at this point is that partitioning configuration consists of one or more transforms. Each transform takes a single column as input. Different transforms can use the same or different columns as inputs. Each transform produces a partitioning key (hidden column). The values of all partition keys are used to generate partition directory. The inputs to transforms are columns in the table. CTAS or INSERT INTO query produces a set of output columns, which are mapped to columns in the table schema. The names of query output columns (input to PartionIdGenerator and TableWrite operator) do not necessarily match the names in the table schema. Hence, InsertTableHandle (or similar) must provide the mappings: index if input column -> schema column. Is this about right? |
| if (field.parameter.has_value()) { | ||
| exprArgs.emplace_back( | ||
| std::make_shared<core::ConstantTypedExpr>( | ||
| INTEGER(), variant(field.parameter.value()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
variant -> Variant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
| exprSet_.get(), rows, *input, results); | ||
|
|
||
| // Verify that all expressions preserved the vector size. | ||
| for (auto i = 0; i < numExpressions; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop seems redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
| /// Provides static methods to build expression trees from Iceberg partition | ||
| /// specification. These expressions can be compiled and evaluated using | ||
| /// Velox's expression evaluation framework. | ||
| class TransformExprBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move this to its own file. Would it make sense to extract this class into a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
|
|
||
| namespace facebook::velox::connector::hive::iceberg { | ||
|
|
||
| /// Utility class for converting Iceberg partition specification to Velox |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Utility class for converting
replace with "Converts ..."; aim to start comments with active verbs
There is a lot of repetition in the comments. Please, revise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. revised.
| /// | ||
| /// The partition spec defines: | ||
| /// - Unique ID for versioning and evolution. | ||
| /// - Which source columns to use for partitioning (identified by field name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
source columns
The 'source' here actually means column in the table schema... I was confused about this for quite some time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refined a bit.
| const auto& name = field.name; | ||
| if (!isValidPartitionType(type)) { | ||
| VELOX_USER_FAIL( | ||
| "Type '{}' is not supported as a partition column.", type->name()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure to put runtime information at the end of the messages. This makes it easier to grep for errors that appear in prod. Please, update throughout.
"Type is not supported as a partition column: {}"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will check all patterns.
| const override; | ||
|
|
||
| private: | ||
| TransformType transformType_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
| /// @return RowVector with one column per transformed column, columns in same | ||
| /// order as IcebergPartitionSpec::fields. Returns nullptr if no partitions | ||
| /// have been created. | ||
| RowVectorPtr partitionKeys() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use const & for return type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
| template <TypeKind Kind> | ||
| std::pair<std::string, std::string> makePartitionKeyValueString( | ||
| const IcebergPartitionUtilPtr& formatter, | ||
| const BaseVector* partitionVector, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change raw pointer to const &
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
| TransformType transformType_; | ||
| }; | ||
|
|
||
| using IcebergPartitionUtilPtr = std::shared_ptr<const IcebergPartitionPath>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IcebergPartitionUtilPtr -> IcebergPartitionPathPtr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
| /// partition column. | ||
| /// | ||
| /// @return RowVector with one column per transformed column, columns in same | ||
| /// order as IcebergPartitionSpec::fields. Returns nullptr if no partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returns nullptr if no partitions
Is this accurate? The implementation suggests that return value is never nullptr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, you are right. Removed.
|
|
||
| rowType_ = ROW(std::move(partitionKeyNames), std::move(partitionKeyTypes)); | ||
|
|
||
| partitionValues_ = BaseVector::create<RowVector>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a getter that returns partitionValues_ as is. It seems this vector may have more rows than are actually valid / populated. Is this intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comment. Yes, it is intentional since the the maximum row is maxPartitions_.
I refined this to dynamically resize.
@mbasmanova , Yes, your understanding is perfect. Only one small thing need to highlight,
Not all transforms can be applied to same column. There are some subtle restrictions on this. See this comment from PartitionSpec.h
When I implementing the partition transform I primarily reference to Iceberg spec directly, https://iceberg.apache.org/spec/#partitioning
Yes, I think so. It is just hard to make each PR to integrated with existing code to provide some new functionality. I think I can submit a few separate PRs as preliminary groundwork and then I can submit another PR to integrate them together and also integrate with existing code. Would this way be Ok? |
mbasmanova
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@PingLiuPing Do you think it is possible to extract PartitionSpec, TransformEvaluator and IcebergPartitionPath into a separate PR?
That would be great. Thank you for being so accommodating. At this point, I feel that most if not all major questions have been resolved and I expect we can proceed with final reviews quickly. Having smaller PRs would help quite a bit. |
I really appreciate your time and feedback. That’s very encouraging, I’ll split the PR and submit the first PR shortly. |
96a4c26 to
102596e
Compare
This is continuous of previous PR #14723.
Implements comprehensive partition transform functionality for
Iceberg tables, enabling data to be partitioned using various transform
functions including identity, bucket, truncate, and temporal transforms
(year, month, day, hour).