feat: Support iceberg partition transform#15509
feat: Support iceberg partition transform#15509PingLiuPing wants to merge 1 commit intofacebookincubator:mainfrom
Conversation
✅ Deploy Preview for meta-velox canceled.
|
|
@mbasmanova As all the preliminary work have been merged. This PR integrate them together and with this PR we support writing partitioned iceberg table. |
| auto commitDataJson = folly::toJson(commitData); | ||
| commitTasks.push_back(commitDataJson); | ||
| } | ||
| return commitTasks; | ||
| } | ||
|
|
||
| void IcebergDataSink::appendData(RowVectorPtr input) { |
There was a problem hiding this comment.
What's the difference between this method's implementation in Iceberg and Hive?
There was a problem hiding this comment.
Thanks for the comment.
There are two differences:
- Hive a minor optimization during writing partitioned table - when the input data has only one partition it directly call
writeand return. But for iceberg it needs to save the partition values insplitInputRowsAndEnsureWritersfor commit message (pass back to upstream engine). - Iceberg does not need to check
isBucketed
| } | ||
| } | ||
|
|
||
| HiveWriterId IcebergDataSink::getIcebergWriterId(size_t row) const { |
There was a problem hiding this comment.
What is the difference between Iceberg writer ID and Hive writer ID?
There was a problem hiding this comment.
Thanks.
There is no difference between Iceberg writer ID and Hive writer ID, for iceberg we can skip checking isBucketed.
There was a problem hiding this comment.
for iceberg we can skip checking isBucketed.
Is this because Iceberg doesn't support bucketing? Do we need a separate getIcebergWriterId method or can this logic be unified?
There was a problem hiding this comment.
Yes, iceberg does not support bucketing, the bucket transform is a different concept.
Do we need a separate getIcebergWriterId method or can this logic be unified?
Yes, just need to make some change to the HiveDataSink to move the getWriterId implementation from .cpp to .h. Otherwise there is linking error since getWriterId is declared as inline.
| @@ -576,6 +576,7 @@ class HiveDataSink : public DataSink { | |||
| uint32_t bucketCount, | |||
| std::unique_ptr<core::PartitionFunction> bucketFunction, | |||
| const std::vector<column_index_t>& partitionChannels, | |||
| const std::vector<column_index_t>& dataChannels, | |||
There was a problem hiding this comment.
Would you add a @param comment for this new argument?
| for (column_index_t i = 0; i < childrenSize; i++) { | ||
| if (std::find(partitionChannels.cbegin(), partitionChannels.cend(), i) == | ||
| partitionChannels.cend()) { | ||
| for (auto i = 0; i < insertTableHandle->inputColumns().size(); i++) { |
There was a problem hiding this comment.
nit: add a temp variable for insertTableHandle->inputColumns() for readability
| @@ -385,7 +382,15 @@ HiveDataSink::HiveDataSink( | |||
| inputType) | |||
| : nullptr, | |||
| getPartitionChannels(insertTableHandle), | |||
| nullptr) {} | |||
| getNonPartitionChannels(insertTableHandle), | |||
| !getPartitionChannels(insertTableHandle).empty() | |||
There was a problem hiding this comment.
It seems sub-optimal to call getPartitionChannels(insertTableHandle) twice and create a vector just to check if it is empty
There was a problem hiding this comment.
Thanks, I refactored the code and move getPartitionChannels to HiveInsertTableHandle
| !getPartitionChannels(insertTableHandle).empty() | ||
| ? std::make_unique<PartitionIdGenerator>( | ||
| inputType, | ||
| getPartitionChannels(insertTableHandle), |
There was a problem hiding this comment.
this is a 3-rd call to getPartitionChannels
| // message sent to Presto. Indexed by writer index. Each entry contains the | ||
| // partition values (as folly::dynamic) for that writer's partition, which | ||
| // are serialized to JSON as "partitionDataJson" in the commit protocol. | ||
| // These values are distinct from the transformed partition values in |
There was a problem hiding this comment.
These values are distinct
Are you saying that the data is the same, but it is represented as JSON here and as Velox Vectors in partitionIdGenerator_? Perhaps, clarify.
There was a problem hiding this comment.
Yes, the data is same but in different format, columnar vs row and stored in folly::dynamic for easy to serialize to json. I will refine the comment.
| return options; | ||
| } | ||
|
|
||
| std::vector<folly::dynamic> IcebergDataSink::makeCommitPartitionValue( |
There was a problem hiding this comment.
Can we return folly::array instead of std::vector?
There was a problem hiding this comment.
Yes, we can just return folly::dynamic.
| std::vector<folly::dynamic> partitionValues(partitionChannels_.size()); | ||
| const auto& transformedValues = partitionIdGenerator_->partitionValues(); | ||
| for (auto i = 0; i < partitionChannels_.size(); ++i) { | ||
| const auto& block = transformedValues->childAt(i); |
There was a problem hiding this comment.
block is a Presto term; do not use in Velox
| if (block->isNullAt(writerIndex)) { | ||
| partitionValues[i] = nullptr; | ||
| } else { | ||
| DecodedVector decoded(*block); |
There was a problem hiding this comment.
No need to decode the whole vector to extract a single value. Just use SimpleVector<T>::valueAt
| #ifdef VELOX_ENABLE_PARQUET | ||
| auto parquetOptions = | ||
| std::dynamic_pointer_cast<parquet::WriterOptions>(options); | ||
| VELOX_CHECK_NOT_NULL(parquetOptions); | ||
| parquetOptions->parquetWriteTimestampTimeZone = std::nullopt; | ||
| parquetOptions->parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds; | ||
| #endif |
There was a problem hiding this comment.
Why is this difference? Is this intentional?
Would it be possible to update PR description to list all differences between Hive and Iceberg?
There was a problem hiding this comment.
Yes this is intentional.
See https://iceberg.apache.org/spec/#parquet, there is requirement when writing parquet files. For timestamp, the precision is microseconds (velox default is nanoseconds) and should not convert to utc timezone.
There was a problem hiding this comment.
Thank you for clarifying. Just to make sure, this is Iceberg-specific requirement that applies to Parquet only. Is this so? Would be nice to add the link above to a comment in the code.
There was a problem hiding this comment.
Thanks. Yes, this is iceberg-specific requirement.
Sure, will add a comment here.
|
|
||
| updatePartitionRows(index, numRows, row); | ||
|
|
||
| if (commitPartitionValue_[index].empty()) { |
There was a problem hiding this comment.
Would it make sense to move this logic into ensureWriter? We can then have shared splitInputRowsAndEnsureWriters logic and custom versions of ensureWriter.
There was a problem hiding this comment.
Thanks.
Yes, it makes sense. This way we an remove splitInputRowsAndEnsureWriters and appendData from IcebergDataSink.
b099733 to
84d45ef
Compare
velox/connectors/hive/HiveDataSink.h
Outdated
| @@ -343,13 +355,35 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle { | |||
| const std::shared_ptr<const LocationHandle> locationHandle_; | |||
|
|
|||
| private: | |||
| std::vector<column_index_t> computePartitionChannels() const { | |||
There was a problem hiding this comment.
Let's move ctor implementation to .cpp and replace computeXxx methods with free functions.
There was a problem hiding this comment.
I also move the constructor from .h to .cpp.
velox/connectors/hive/HiveDataSink.h
Outdated
| @@ -671,7 +708,19 @@ class HiveDataSink : public DataSink { | |||
|
|
|||
| // Get the HiveWriter corresponding to the row | |||
| // from partitionIds and bucketIds. | |||
| FOLLY_ALWAYS_INLINE HiveWriterId getWriterId(size_t row) const; | |||
| FOLLY_ALWAYS_INLINE HiveWriterId getWriterId(size_t row) const { | |||
There was a problem hiding this comment.
Let's remove FOLLY_ALWAYS_INLINE and move the implementation to .cpp file.
| } | ||
| auto fileFormat = dwio::common::toString(insertTableHandle->storageFormat()); | ||
| return { | ||
| fmt::format("{}.{}", targetFileName, fileFormat), |
There was a problem hiding this comment.
nit: can be avoid formatting same string twice?
| } | ||
|
|
||
| folly::dynamic IcebergFileNameGenerator::serialize() const { | ||
| VELOX_UNREACHABLE(); |
There was a problem hiding this comment.
Don't we need to implement this?
There was a problem hiding this comment.
I found that the serialization and de-serialization logic are only used during test. See void HiveConnector::registerSerDe().
There was a problem hiding this comment.
Hmm... if so, why do we need it at all? Would you look into who introducing this API / logic and let's see if we can just delete it.
There was a problem hiding this comment.
@mbasmanova Thanks.
Found that SerDe is necessary as part of the test infrastructure. But still I cannot find it been used in production code.
I searched registerSerDe in Prestissimo code and found it also only used in test code.
In velox, the SerDe will be called from top down.
In Velox, I found SerDe is used for few purpose:
-
In fuzzer, remote query execution
velox/velox/exec/fuzzer/VeloxQueryRunner.cpp
Lines 202 to 214 in 7d2979d
-
Query tracing and replay.
See tracing:velox/velox/exec/TaskTraceWriter.cpp
Lines 36 to 65 in 7d2979d
Replaying:
velox/velox/exec/TaskTraceReader.cpp
Lines 26 to 37 in 7d2979d
And TraceReplayRunner:
velox/velox/tool/trace/TraceReplayRunner.cpp
Lines 255 to 312 in 7d2979d
I think we need to add SerDe implementation.
Would it make sense to add SerDe in a separate PR? This should be added to few classes.
There was a problem hiding this comment.
Would it make sense to add SerDe in a separate PR?
Sounds good. Let's open an issue.
| VELOX_USER_CHECK_EQ( | ||
| tableStorageFormat, | ||
| dwio::common::FileFormat::PARQUET, | ||
| "Only Parquet file format is supported when writing Iceberg tables. Format: {}", |
There was a problem hiding this comment.
No need to include format in the message. It is included automatically. Please, trigger this error and check the error message.
There was a problem hiding this comment.
Thanks, the format in the message is redundant.
C++ exception with description "Exception: VeloxUserError
Error Source: USER
Error Code: INVALID_ARGUMENT
Reason: (dwrf vs. parquet) Only Parquet file format is supported when writing Iceberg tables. Format: dwrf
| @@ -239,6 +344,9 @@ std::vector<std::string> IcebergDataSink::commitMessage() const { | |||
| ("fileFormat", "PARQUET") | |||
| ("content", "DATA"); | |||
| // clang-format on | |||
| if (!(commitPartitionValue_.empty() || commitPartitionValue_[i].isNull())) { | |||
| commitData["partitionDataJson"] = toJsonString(commitPartitionValue_[i]); | |||
There was a problem hiding this comment.
toJsonString function is used only once and is very short; consider removing it and writing out logic here; this would make it easier to understand the overall format of commit data
|
|
||
| for (const auto& file : files) { | ||
| std::vector<std::string> pathComponents; | ||
| folly::split("/", file, pathComponents); |
There was a problem hiding this comment.
This logic is repeated. Would it be possible to extract a helper function that takes a path and returns a map of partition keys? Then reuse it in multiple places?
| std::vector<std::string> parts; | ||
| folly::split('=', component, parts); | ||
| ASSERT_EQ(parts.size(), 2); | ||
| ASSERT_EQ(parts[0], rowType->nameOf(colIndex)); |
There was a problem hiding this comment.
The verification logic might be simpler if we just hard-code expected path for each test case.
| auto rowType = ROW( | ||
| {"c1", "c2", "c3", "c4", "c5", "c6"}, | ||
| {BIGINT(), INTEGER(), SMALLINT(), DECIMAL(18, 5), BOOLEAN(), VARCHAR()}); | ||
| for (auto colIndex = 0; colIndex < rowType->size(); colIndex++) { |
There was a problem hiding this comment.
It looks like this test has 6 test cases. Each test cases tests a single column. If so, it would be clearer to write it this way by creating a testCase struct that contains a column name, type, expected path, etc. Then loop over these.
| const auto commitTasks = dataSink->close(); | ||
| auto splits = createSplitsForDirectory(outputDirectory->getPath()); | ||
|
|
||
| ASSERT_GT(commitTasks.size(), 0); |
There was a problem hiding this comment.
Can we assert specific number of tasks? Is this non-deterministic? Why?
There was a problem hiding this comment.
Thanks. Yes it is non-deterministic since the number of tasks depends on the number of writers which depends on the number of partitions which in turn depends on the data, the data is random data generated by fuzzer.
There was a problem hiding this comment.
Let's use a seed to make fuzzer generated data deterministic.
| RowTypePtr rowType) { | ||
| std::vector<RowVectorPtr> batches; | ||
| static const std::vector<Timestamp> timestamps = { | ||
| Timestamp(0, 0), // 1970-01-01 00:00:00 |
There was a problem hiding this comment.
Would you add a comment to explain how you chose these values?
Integrate Iceberg partition transform functionality with IcebergDataSink.
Complete support for writing partitioned Iceberg table. Include identity,
bucket, truncate, and temporal transforms.
Add end-to-end tests covering various partition transform scenarios.
Differences between Hive and Iceberg when writing parquet data files:
Hive: Uses session timezone from connectorQueryCtx_->sessionTimezone() and respects adjustTimestampToTimezone setting
Iceberg: Ignore session timezone, timezone-agnostic.
Hive: Uses configurable timestamp precision.
Iceberg: Use microseconds for Iceberg spec compliance.
Hive: Support bucketing, use special file naming conventions. For non bucketing case, Hive track which task/driver wrote which file,
Iceberg: Simple UUID-based names.