feat(native): Support sorted by during write to Iceberg tables#26182
feat(native): Support sorted by during write to Iceberg tables#26182PingLiuPing wants to merge 6 commits intoprestodb:masterfrom
Conversation
Reviewer's GuideThis PR introduces support for sorted-by ordering when writing to Iceberg tables by enabling a native insertion path behind a compile-time flag, parsing and propagating sort fields through the Velox connector, and updating the Java metadata and sink implementations to be session-aware and robust for partition transforms and JSON decimal handling. ER diagram for PartitionTransformType enum changeserDiagram
PartitionTransformType {
IDENTITY
HOUR
DAY
MONTH
YEAR
BUCKET
TRUNCATE
}
Class diagram for updated IcebergPrestoToVeloxConnector and related typesclassDiagram
class IcebergPrestoToVeloxConnector {
+toVeloxInsertTableHandle(createHandle, typeParser, pool)
+toVeloxInsertTableHandle(insertHandle, typeParser, pool)
+toIcebergColumns(inputColumns, typeParser, hasPartitionColumn)
+toIcebergSortingColumns(sortFields, schema)
+toVeloxIcebergPartitionField(field, typeParser, schema)
+toVeloxIcebergPartitionSpec(spec, typeParser)
}
class HivePrestoToVeloxConnector {
+toVeloxInsertTableHandle(createHandle, typeParser, pool)
+toVeloxInsertTableHandle(insertHandle, typeParser, pool)
}
class PrestoToVeloxConnector {
+toVeloxInsertTableHandle(createHandle, typeParser, pool)
+toVeloxInsertTableHandle(insertHandle, typeParser, pool)
}
IcebergPrestoToVeloxConnector --|> PrestoToVeloxConnector
HivePrestoToVeloxConnector --|> PrestoToVeloxConnector
IcebergPrestoToVeloxConnector o-- "IcebergColumnHandle"
IcebergPrestoToVeloxConnector o-- "IcebergSortingColumn"
IcebergPrestoToVeloxConnector o-- "IcebergPartitionSpec"
IcebergPrestoToVeloxConnector o-- "TypeParser"
IcebergPrestoToVeloxConnector o-- "MemoryPool"
HivePrestoToVeloxConnector o-- "TypeParser"
HivePrestoToVeloxConnector o-- "MemoryPool"
PrestoToVeloxConnector o-- "TypeParser"
PrestoToVeloxConnector o-- "MemoryPool"
Class diagram for updated IcebergColumnHandle Java classclassDiagram
class IcebergColumnHandle {
+create(column, typeManager, columnType)
+create(partitionFieldId, column, typeManager, columnType)
+getPushedDownSubfield(column)
-columnIdentity
-type
-doc
-columnType
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
| const protocol::CreateHandle* createHandle, | ||
| const TypeParser& typeParser) const { | ||
| const TypeParser& typeParser, | ||
| memory::MemoryPool* pool) const { |
There was a problem hiding this comment.
Why is memory pool variable added ? Doesn't seem like its used. Memory pool should be used for allocating from operators not during this conversion.
There was a problem hiding this comment.
Thanks for the comment.
MemoryPool is needed by Iceberg connector. When doing partition transform, we need to create new vectors to hold the transformed value.
And it is not needed here in Hive connector, but since both of Hive and Iceberg inherit from base class and overwrite this method toVeloxInsertTableHandle, I add this parameter in base class.
| if (partitionValue.isLong()) { | ||
| return BigDecimal.valueOf(partitionValue.asLong(), ((DecimalType) type).scale()); | ||
| } | ||
| else if (partitionValue.isInt()) { |
There was a problem hiding this comment.
Can you add some test for this logic ?
There was a problem hiding this comment.
Sure, I think I have test case cover this, let me find it.
There was a problem hiding this comment.
Yes, I have testcase cover this,

And it can be easily tested in presto-cli. The logic here is, in Velox decimal is represented as integer (int64/int128). But when we passing the partition value back to Presto, it is encoded in json format, and here when decode the value, if the value is a small integer it will be treated as int32.
presto:iceberg> insert into identity_t2 values(1, -123, cast('89.124' AS decimal(5,3)), 'ABCD QWERT', x'455843454C4C454E54');
INSERT: 1 row
There was a problem hiding this comment.
There was a problem hiding this comment.
Thanks for the comment.
This is a mistake, and the change is duplicate. It must be caused by multiple rebase and merging when I maintain these branches locally. I've reverted all the changes in this file.
47e676e to
7bea642
Compare
|
For release-notes pipeline error, opened an issue #26222 |
yingsu00
left a comment
There was a problem hiding this comment.
Approve contingent to the mvn failure resolved.
71ca098 to
7bea642
Compare
|
You may rebase fix the CI now. |
7bea642 to
f8dfb27
Compare
| std::vector<connector::hive::iceberg::IcebergSortingColumn> sortedBy; | ||
| sortedBy.reserve(sortFields.size()); | ||
| for (const auto& sortField : sortFields) { | ||
| velox::core::SortOrder veloxSortOrder( |
There was a problem hiding this comment.
Please can you leave a comment stating this matches asc/desc, nulls first/nulls last.
There was a problem hiding this comment.
Thanks for the comment. Let me add a comment.
|
|
||
| private: | ||
| std::vector<std::shared_ptr<const velox::connector::hive::HiveColumnHandle>> | ||
| std::vector<std::shared_ptr<const velox::connector::hive::iceberg::IcebergColumnHandle>> |
There was a problem hiding this comment.
Is this a bug ? Did you intentionally leave HiveColumnHandle in previous commit ?
There was a problem hiding this comment.
Thanks for the comment.
Yes, the commit here matches the code in velox. And in velox IcebergColumnHandle is added later. So I change this to match the code in velox.
When I test the code I follow:
Presto PR1 -> Velox PR 1
Presto PR2 -> Velox PR 2
f8dfb27 to
ba1006b
Compare
ba1006b to
2c494e9
Compare
Description
Support sorted by when writing to iceberg table.
Reviewers, please review the last commit only for this PR.
Motivation and Context
Impact
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.