-
Notifications
You must be signed in to change notification settings - Fork 1.4k
fix: Remove redundant partition value handling in Iceberg column adaptation #14516
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Remove redundant partition value handling in Iceberg column adaptation #14516
Conversation
✅ Deploy Preview for meta-velox canceled.
|
|
@majetideepak Can you help to review this PR, thank you very much. |
|
@zhli1142015 I think PR #12910 (a dependency of this PR) will be merged soon. |
a372d99 to
90c79bb
Compare
|
@mbasmanova The purpose of this PR is to address certain limitations in the Iceberg reader. When a partition column is defined in an Iceberg table, and when query such iceberg table we cannot reuse the Hive reader. This is because the Hive writer only writes non-partition columns to data files, and the partition column values are derived from the partition map during query phase (there is special logic in hive reader). In contrast, Iceberg requires all columns (both partition and non-partition) to be written to data files, and hence during reading we do not need those special logic. And for most Iceberg partition transforms, it’s not possible to reconstruct the original column value from the partition value. |
| VectorPtr& output, | ||
| const std::vector<BaseVector::CopyRange>& ranges); | ||
|
|
||
| void setPartitionValue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you document this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
I added the document.
velox/connectors/hive/SplitReader.h
Outdated
| std::vector<TypePtr> adaptColumns( | ||
| virtual std::vector<TypePtr> adaptColumns( | ||
| const RowTypePtr& fileType, | ||
| const std::shared_ptr<const velox::RowType>& tableSchema) const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: RowTypePtr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
| uint64_t next(uint64_t size, VectorPtr& output) override; | ||
|
|
||
| private: | ||
| std::vector<TypePtr> adaptColumns( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you document this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
I added document to this method.
| private: | ||
| std::vector<TypePtr> adaptColumns( | ||
| const RowTypePtr& fileType, | ||
| const std::shared_ptr<const velox::RowType>& tableSchema) const override; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RowTypePtr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
| for (auto i = 0; i < childrenSpecs.size(); ++i) { | ||
| auto* childSpec = childrenSpecs[i].get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be a for-each loop?
for (const auto& child : childrenSpecs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
| HiveConnectorTestBase::assertQuery(plan, splits, "SELECT 0, '2018-04-06'"); | ||
| } | ||
|
|
||
| TEST_F(HiveIcebergTest, testReadDecimal) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop 'test' prefix from test method names (please, update pre-existing method names in this file)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
| } | ||
|
|
||
| TEST_F(HiveIcebergTest, testReadDecimal) { | ||
| RowTypePtr rowType{ROW({"c0", "price"}, {BIGINT(), DECIMAL(10, 2)})}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto rowType = ROW(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
| VectorPtr expectedPrice = | ||
| makeFlatVector<int64_t>({12345, 12345, 12345}, DECIMAL(10, 2)); | ||
| if (i == 1) { | ||
| expectedPrice = | ||
| makeFlatVector<int64_t>({67890, 67890, 67890}, DECIMAL(10, 2)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code is confusing; consider rewriting
std::vector<int64_t> unscaledPartitionValues = {12345, 67890};
...
auto expectedPrice =
makeFlatVector<int64_t>(3, [&](auto) {return unscaledPartitionValues[i];}, DECIMAL(10, 2));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I re-implement the test to more accurately cover the code changes.
| std::make_shared<HiveColumnHandle>( | ||
| "c0", | ||
| HiveColumnHandle::ColumnType::kRegular, | ||
| rowType->childAt(0), | ||
| rowType->childAt(0))}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add helper method to reduce copy-paste. Updating existing code in this file as well.
makeColumnHandle("c0", rowType->childAt(0));
makePartitionKeyHandle("price", rowType->childAt(1));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Added makeColumnHandles to create the column handle map.
| assertQuery(plan, splits, "SELECT * FROM tmp", 0); | ||
| } | ||
|
|
||
| TEST_F(HiveIcebergTest, testAddNewColumn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests a quite repetitive. Consider refactoring to reduce boiler plate and copy-paste.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
|
Which error do you meet? Does it only occur in decimal column? Why? |
| std::vector<TypePtr> IcebergSplitReader::adaptColumns( | ||
| const RowTypePtr& fileType, | ||
| const std::shared_ptr<const velox::RowType>& tableSchema) const { | ||
| std::vector<TypePtr> columnTypes = fileType->children(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please respect infoColumns, iceberg may also have infoColumns such as _delete.
|
I don't see error in Gluten unit test, do you have a unit test in presto to reproduce it? |
@jinchengchenghh With or without IBM#425? |
Thanks, I think I need to change the PR description little bit. When I open this PR, #12910 has not been merged. And at that time when reading a decimal partition column there are errors. But the second point in PR description stands. See #14516 (comment). |
|
In gluten, we test two mode, With or without IBM#425, both of them can pass all the tests. |
Ok, without IBM#425, do you have a test case that querying a partitioned iceberg table? |
|
Yes, this is a feature added long long ago, our customers also use it. |
| writeToFile( | ||
| dataFilePath->getPath(), dataVectors, config_, flushPolicyFactory_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code repeats. Consider adding helper method to shorten to
writeToFile(dataFilePath, dataVectors);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
| auto icebergSplits = makeIcebergSplits(dataFilePath->getPath()); | ||
|
|
||
| // Read with new schema (c0, c1, and c2). | ||
| auto plan = PlanBuilder(pool_.get()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drop pool_.get() parameter. It is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
| auto expectedRegion = makeFlatVector<std::string>({"US", "US", "US"}); | ||
| auto expectedYear = makeFlatVector<int32_t>({2025, 2025, 2025}); | ||
| std::vector<RowVectorPtr> expectedVectors; | ||
| expectedVectors.push_back(makeRowVector( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider
makeRowVector(tableRowType->names(), {
makeFlatVector<std::string>({"US", "US", "US"}),
makeFlatVector<int32_t>({2025, 2025, 2025}),
})
No need for expectedRegion and expectedYear variables. Also, no need to repeat column names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
58a3a11 to
7ffa060
Compare
velox/connectors/hive/SplitReader.h
Outdated
|
|
||
| namespace facebook::velox::connector::hive { | ||
|
|
||
| /// Creates a constant vector from a string representation of a value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a constant vector of size 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks,
velox/connectors/hive/SplitReader.h
Outdated
|
|
||
| /// Creates a constant vector from a string representation of a value. | ||
| /// | ||
| /// This function is primarily used to materialize partition column values and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is primarily used
Drop "This function is primarily". Start with an active verb: Used to ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
velox/connectors/hive/SplitReader.h
Outdated
| /// Creates a constant vector from a string representation of a value. | ||
| /// | ||
| /// This function is primarily used to materialize partition column values and | ||
| /// info columns (e.g., $path, $file_size) when reading Hive/Iceberg tables. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hive/Iceberg -> Hive and Iceberg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
velox/connectors/hive/SplitReader.h
Outdated
| /// the same way as CAST(x as VARCHAR). Date values must be formatted using ISO | ||
| /// 8601 as YYYY-MM-DD. If nullopt, creates a null constant vector. | ||
| /// @param pool Memory pool for allocating the constant vector. | ||
| /// @param asLocalTime If true and type is TIMESTAMP, interprets the string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love this documentation. Detailed and clear. Thank you for taking the time to write it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cheers.
|
|
||
| HiveConnectorTestBase::assertQuery( | ||
| auto plan = | ||
| PlanBuilder(pool_.get()).tableScan(rowType, {}, "", nullptr).planNode(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop pool_.get() argument; ditto other places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mbasmanova Thanks.
Removed most of them, and there is one place left where this pool can not be removed.
// Test filter on non-partitioned date column
std::vector<std::string> filters = {"ds = date'2018-04-06'"};
plan = PlanBuilder(pool_.get()).tableScan(rowType, filters).planNode();
when removed it, the test cases crashed at
Line 392 in 6851fd3
| pool->preferredSize(checkedPlus<size_t>(size, kPaddedSize)); |
But by reading this case carefully I don't think it makes sense.
For iceberg:
- There is no difference between reading partitioned table and non-partitioned table.
- Even we want test reading partitioned table, we should first write partitioned iceberg table, but this case does not write partitioned table at all.
So probably this case can be deleted, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I follow.
| expectedVectors.push_back(makeRowVector( | ||
| {c0, | ||
| makeNullConstant(TypeKind::INTEGER, 3), | ||
| makeNullConstant(TypeKind::VARCHAR, 3)})); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comma after the last vector to improve readability
apply the same refactoring to other calls to makeRowVector
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
There are places where a vector will be used twice so I kept the local variable.
I removed all the local vector variables and get them from the rowVector when they are been used in second place.
mbasmanova
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great. Thank you for iterating.
7ffa060 to
ced9ba5
Compare
| const std::optional<std::string>& value, | ||
| velox::memory::MemoryPool* pool, | ||
| bool asLocalTime, | ||
| bool isPartitionDateDaysSinceEpoch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rename these arguments as well.
|
|
||
| template <TypeKind kind> | ||
| VectorPtr newConstantFromString( | ||
| VectorPtr newConstantFromStringImpl( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rename arguments and remove default value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I missed this.
mbasmanova
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
|
@mbasmanova has imported this pull request. If you are a Meta employee, you can view this in D85344487. |
ced9ba5 to
333e99e
Compare
| } | ||
| } // namespace | ||
|
|
||
| VectorPtr newConstantFromString( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this into the anonymous namespace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Yuhta This is a public API now.
|
@mbasmanova merged this pull request in d0f6a24. |
…tation (facebookincubator#14516) Summary: Implement IcebergSplitReader::adaptColumns. Without overriding this method, current iceberg reader uses `SplitReader::adaptColumns` which is specific to Hive implementation. One difference between Hive and Iceberg is Iceberg spec requires all columns should be wrote to data files while in Hive, it only write non-partition columns to data file. So, there are special logic to handle this during read in `SplitReader::adaptColumns`. But in Iceberg this is not needed. And for Hive, it populates the column data with partition value directly, this is correct for Hive only, but for Iceberg, there are different kinds transforms other than Identity transform. And we can not deduce the original column from the partition value. Pull Request resolved: facebookincubator#14516 Reviewed By: Yuhta Differential Revision: D85344487 Pulled By: mbasmanova fbshipit-source-id: e0de6bdd44257a37f6727840803754639accdfd1
Implement IcebergSplitReader::adaptColumns. Without overriding this method, current iceberg reader uses
SplitReader::adaptColumnswhich is specific to Hive implementation. One difference between Hive and Iceberg is Iceberg spec requires all columns should be wrote to data files while in Hive, it only write non-partition columns to data file. So, there are special logic to handle this during read inSplitReader::adaptColumns. But in Iceberg this is not needed.And for Hive, it populates the column data with partition value directly, this is correct for Hive only, but for Iceberg, there are different kinds transforms other than Identity transform. And we can not deduce the original column from the partition value.