feat: Collect Iceberg stats#16062
feat: Collect Iceberg stats#16062PingLiuPing wants to merge 5 commits intofacebookincubator:mainfrom
Conversation
✅ Deploy Preview for meta-velox canceled.
|
eff6947 to
1af47a8
Compare
|
@mbasmanova @majetideepak @Yuhta Could you please take a look at this PR? Thank you very much. |
mbasmanova
left a comment
There was a problem hiding this comment.
I don't see logic that uses the stats config. Can you add that?
| VELOX_CHECK_EQ(field.children.size(), type->size()); | ||
| config.children.reserve(field.children.size()); | ||
|
|
||
| if (type->isRow()) { |
There was a problem hiding this comment.
No need to switch on type. Just loop over [0, Type::size()) and use Type::childAt.
| @@ -392,6 +445,28 @@ IcebergDataSink::createWriterOptions() const { | |||
| // (TimestampPrecision::kMicroseconds). | |||
| options->serdeParameters["parquet.writer.timestamp.unit"] = "6"; | |||
| options->serdeParameters["parquet.writer.timestamp.timezone"] = ""; | |||
|
|
|||
| std::function<folly::dynamic(const IcebergFieldStatsConfig&)> toJson = | |||
There was a problem hiding this comment.
Are we missing logic to serialize 'skipBounds'?
Can you move serde logic to a method on IcebergFieldStatsConfig?
There was a problem hiding this comment.
Thanks. Deleted seDer logic now and directly passing ParquetFieldId to writer options.
| /// Config for collecting Iceberg parquet field statistics. | ||
| /// Holds the Iceberg source field id and whether to skip bounds | ||
| /// collection for this field. For nested field, it contains child fields. | ||
| struct IcebergFieldStatsConfig { |
There was a problem hiding this comment.
I assume this struct is specific to Parquet. If that's the case, we may want to add Parquet to the name.
perhaps, IcebergParquetWriterStatsConfig
velox/dwio/parquet/writer/Writer.h
Outdated
| // of field ID structures. Each structure has "fieldId" (int32) and optional | ||
| // "children" (array of nested structures). | ||
| // Example: [{"fieldId":1,"children":[{"fieldId":2}]},{"fieldId":3}]. | ||
| static constexpr const char* kParquetSerdeFieldIds = |
There was a problem hiding this comment.
Sorry, but I do not understand. Would you clarify some more? This config seems to control how writer collects stats. It doesn't appear to be used in any sort of serialization or deserialization. Am I missing something?
There was a problem hiding this comment.
Oh, sorry for the confusion — let me clarify.
This config indeed controls how the writer collects stats and serves two purposes:
- It provides the Parquet field_id, which should be passed down to the Parquet writer and written into the Parquet data file.
- It provides skipBounds, which indicates whether min/max statistics should be collected for a given Parquet field. This is used after the Parquet data is written, during the conversion from Parquet stats to Iceberg stats (not implemented yet).
For the first part, ideally we would just set ParquetFieldId directly here. However, I’ve run into issues with this approach in the past (see:
#15509 (comment)).
To avoid explicitly depending on symbols from the Parquet writer, I pass this parameter to the Parquet writer via serdeParameters instead.
An alternative could be to change the CMake configuration so that connectors/hive/iceberg is only compiled when VELOX_ENABLE_PARQUET is enabled and then we can safely include parquet symbols in here.
|
@yingsu00 Ying, do you have any thoughts on tight coupling between Iceberg connector and Parquet format? See #16062 (comment) It seems non-ideal for the connector to depend directly on the particular storage format. |
98dd9ec to
056d456
Compare
|
@mbasmanova Thank you for reviewing this PR. I’ve now merged all Iceberg stats collection logic into this PR and updated the description accordingly. Initially I planned to split it into 3 PRs, with this one only covering stats configuration, but consolidating them makes the overall design easier to review. Regarding the concern about tight coupling between the Iceberg connector and Parquet, at the moment, the Iceberg connector depends on Parquet in three places: I couldn’t find a more elegant way to fully decouple this at the moment. For Parquet field IDs in particular, since RowType does not carry equivalent IDs, we eventually need a mechanism to pass these IDs to the Parquet writer. The current approach passes them via writer options. For While this PR is mainly focused on Iceberg stats collection, I plan to refactor iceberg/CMakeLists.txt in the near future to split the target into separate reader and writer components. Since Parquet is currently the only supported file format for writing, the writer target can then be conditionally compiled only when VELOX_ENABLE_PARQUET is enabled. |
I think Ying's PR #14090 could help resolve the parquet reader and writer reference issue from test. |
056d456 to
3c05a02
Compare
I believe Iceberg supports other file formats as well. How does it pass field IDs to these? |
Yes, you’re right, Iceberg supports Avro, Parquet, and ORC. According to the Iceberg spec, all supported file formats are required to store IDs (see the format-specific requirements in the spec: https://iceberg.apache.org/spec/#appendix-a-format-specific-requirements). There is also a current limitation on the Iceberg reader side. When reading an Iceberg table written by another engine, even if the Parquet schema contains Had a quick look at the Iceberg Java implementation, there is explicit logic to translate Iceberg field IDs into the corresponding IDs of the underlying file format. For example:
In our case, the Iceberg field IDs are computed and passed down from the coordinator node (Java) to the writer. These IDs are exported to Arrow and eventually write to parquet file schema. |
|
@mbasmanova Sorry, I realized I forgot to tag you earlier. Just flagging it here in case you have a chance to take a look and would love to hear your thoughts. |
mbasmanova
left a comment
There was a problem hiding this comment.
@PingLiuPing I'm seeing a number of warnings. Any chance you could go over these and address?
feat: Collect Iceberg stats
I assume this refers to file-level stats. If so, perhaps, update PR title to clarify.
| // @param field The Parquet field ID containing the field ID and child field | ||
| // IDs. | ||
| // @param type The Velox type corresponding to this field. | ||
| // @param skipBounds Whether to skip bounds collection for this field and its |
There was a problem hiding this comment.
@PingLiuPing Can you remind me why do we need skipBounds functionality? Why don't we collect min/max values unconditionally?
There was a problem hiding this comment.
Is skipBounds == true equivalent to saying that a column is a map/array of a subfield of such?
There was a problem hiding this comment.
Thanks @mbasmanova.
This is a bit tricky. Initially, I was collecting min/max unconditionally for all types, and the Iceberg spec doesn’t explicitly describe this behavior. During testing, I found that both Presto and Spark do not collect min/max statistics for array and map types.
Looking into the Iceberg source code, there is indeed a such limitation enforced there. For reference:
https://github.com/apache/iceberg/blob/5970ddd9278a2baa060183a18f895de3608eab1f/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetrics.java#L185-L200
There was a problem hiding this comment.
Is skipBounds == true equivalent to saying that a column is a map/array of a subfield of such?
Yes, based on current implementation we can have such assumption.
There was a problem hiding this comment.
@PingLiuPing What's the downside of collecting these stats?
If we do need to skip collecting min/max for arrays, maps and their subfields, then let's rename 'skipBounds', remove it from public API and make it an implementation detail.
| bool currentSkipBounds = skipBounds || type->isMap() || type->isArray(); | ||
| IcebergParquetStatsConfig config(field.fieldId, currentSkipBounds); | ||
|
|
||
| if (!field.children.empty()) { |
There was a problem hiding this comment.
Do we need this check? Can we execute the logic inside the 'if' body unconditionally?
There was a problem hiding this comment.
Thanks, it can be removed.
| auto icebergColumnHandle = | ||
| checkedPointerCast<const IcebergColumnHandle>(columnHandle); | ||
| icebergParquetStatsConfig_.push_back(toIcebergFieldStatsConfig( | ||
| icebergColumnHandle->field(), icebergColumnHandle->dataType(), false)); |
There was a problem hiding this comment.
Would you annotate 'false' with the parameter name ?
/*param=*/false
| ("partitionSpecJson", | ||
| icebergInsertTableHandle->partitionSpec() ? icebergInsertTableHandle->partitionSpec()->specId : 0) | ||
| // Sort order evolution is not supported. Set default id to 1. |
There was a problem hiding this comment.
Thanks.
From iceberg spec https://iceberg.apache.org/spec/#sorting, a sort order is defined by a sort order id and a list of sort fields.
Order id 0 is reserved for the unsorted order. Will change this to 0.
| /// Iterates through all row groups and columns to collect: | ||
| /// - Record count, split offsets, value counts, column sizes, null counts. | ||
| /// - Min/max bounds (base64-encoded) for columns not in skipBounds set. | ||
| /// @param metadata Pointer to shared_ptr<parquet::arrow::FileMetaData>. |
There was a problem hiding this comment.
Why is being passed as void*?
There was a problem hiding this comment.
Thanks.
Refactored the code to use template class.
| dataFileStats->numRecords = fileMetadata->num_rows(); | ||
| const auto numRowGroups = fileMetadata->num_row_groups(); | ||
| for (auto i = 0; i < numRowGroups; ++i) { | ||
| const auto& rgm = fileMetadata->RowGroup(i); |
| // e.g., schema_->size(). It also contains the sub-fields when there are | ||
| // nested data types in table's schema. | ||
| std::unordered_set<int32_t> skipBoundsFields; | ||
| int32_t numFields = 0; |
There was a problem hiding this comment.
Is numFields used to sanity check row group metadata? Is this needed?
There was a problem hiding this comment.
Thanks. You are right, this can be removed.
| std::unordered_map<int32_t, std::shared_ptr<parquet::arrow::Statistics>> | ||
| globalMinStats; | ||
| std::unordered_map<int32_t, std::shared_ptr<parquet::arrow::Statistics>> | ||
| globalMaxStats; |
There was a problem hiding this comment.
Do we need 2 maps? Can we use a single map with value being a pair of stats?
There was a problem hiding this comment.
Thanks. Merged to a single map.
124e6a3 to
a1b87a0
Compare
@kgpai Thanks, tested locally with VELOX_ENABLE_PARQUET=OFF. |
|
@PingLiuPing Update: Almost there, some builds/tests failing which hopefully I can get over the line today. |
@kgpai Thank you so much. |
|
@PingLiuPing HiveDataSinkTest.memoryReclaimAfterClose is seeing thread sanitizer issues with this change, which is what I am investigating. Will update you if some code changes are required to fix that. |
|
@kgpai Thank you for helping importing this PR, let me know if you need anything from my side. |
|
@PingLiuPing Had to make some changes on the parquet side to get our gluten builds working. So exporting that to this PR. |
|
@PingLiuPing I had to make some changes to get things to work, but unfortunately cant export them . |
a1b87a0 to
9ed076d
Compare
@kgpai Thanks, I updated the code. And I also fixed another code conflict that relate to writer rotation. |
9ed076d to
9c903cf
Compare
|
@kgpai Thank you for importing again, I see internal build and test still failing, anything I can do? |
|
No few more tests need fixing - hoping to resolve that by tomorrow. |
|
@PingLiuPing Thank you for bearing with me - almost there with this diff. If all goes well should land today. |
@kgpai Thank you so much, this will make my ongoing work much easier. |
This reverts commit e7dd656.
This reverts commit e7dd656.
This reverts commit e7dd656.
Implements Iceberg Parquet data file statistics collection.
The stats include:
These stats is presented by struct IcebergDataFileStatistics.
Added IcebergParquetStatsCollector for collecting Iceberg Parquet files stats.
This collector take Parquet file metadata as input and aggregates across all row groups, computes global (per file) min/max bounds. The file metadata is returned when calling Writer::close().
Once the stats are collected. Includes full stats in commit message JSON (previously only had recordCount).