Skip to content

Conversation

@PingLiuPing
Copy link
Collaborator

There are two writer mode supported by iceberg, one is fanout (default), the other one clustered.
This PR implements clustered writer mode.
When using clustered input mode, the input data is assumed to be clustered/partitioned beforehand.

@netlify
Copy link

netlify bot commented Sep 1, 2025

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit f02ffce
🔍 Latest deploy log https://app.netlify.com/projects/meta-velox/deploys/68b6ed5c3a3f0e00082a8af1

@meta-cla meta-cla bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Sep 1, 2025
@PingLiuPing PingLiuPing force-pushed the lp_iceberg_clustered_writer branch from 65e8480 to dc64f0a Compare September 2, 2025 10:19
@PingLiuPing PingLiuPing force-pushed the lp_iceberg_clustered_writer branch from dc64f0a to 298c0c4 Compare September 2, 2025 10:38
@PingLiuPing PingLiuPing force-pushed the lp_iceberg_clustered_writer branch from 298c0c4 to f02ffce Compare September 2, 2025 13:12
jinchengchenghh added a commit to apache/incubator-gluten that referenced this pull request Sep 3, 2025
Add Protobuf struct IcebergPartitionField to transfer the iceberg id information, add IcebergPartitionSpec to transfer partition information.
Build with test and benchmark in CI and fix IcebergWriteTest build.
Set the file format to orc to bypass native parquet write for partitioned tpch iceberg suite, after facebookincubator/velox#14670 which supports fanout false mode merged, we can relax the restriction.

Relevant PR: facebookincubator/velox#13874
std::unique_ptr<DataFileStatsCollector> icebergStatsCollector_;

// Below are structures for clustered mode writer.
const bool fanoutEnabled_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move const member upfront

@majetideepak
Copy link
Collaborator

@PingLiuPing Can you add more details on the iceberg clustered writes?

When using clustered input mode, the input data is assumed to be clustered/partitioned beforehand.

In the current implementation, who handles the clustering and partitioning?

@PingLiuPing
Copy link
Collaborator Author

@majetideepak , thanks for the comment.

In the current implementation, who handles the clustering and partitioning?

It should be processed by the caller to guarantee the input data is partitioned first. For example, for identity partition transform (same with Hive), and suppose the partition column type is integer. The valid input stream is 1,1,1,2,2,2,3,3,3. But if the input stream is 1,1,1,2,2,1, then the cluster mode will report error.

@majetideepak
Copy link
Collaborator

majetideepak commented Sep 11, 2025

I now see that this PR corresponds to the last commit and the other commits are part of other PRs.


namespace facebook::velox::connector::hive::iceberg {

struct IcebergNestedField {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ColumnIdentity. Both Presto and Trino are using this name.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yingsu00 Thanks for the comments. This PR contains 7 commits but only the last commit is for clustered writer. I have to do this to pass the CI because it depends the previous commits but they are not merged.
Sorry for the confuse.


std::function<IcebergDataFileStatsSettings(
const IcebergNestedField&, const TypePtr&, bool)>
buildNestedField = [&](const IcebergNestedField& f,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to make buildNestedField a private function instead of a lambda here? This is hard to read.

bool skipBounds) -> IcebergDataFileStatsSettings {
VELOX_CHECK_NOT_NULL(type, "Input column type cannot be null.");
bool currentSkipBounds = skipBounds || type->isMap() || type->isArray();
IcebergDataFileStatsSettings field(f.id, currentSkipBounds);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name "field" is confusing. Can we just name it statsSettings?

@PingLiuPing
Copy link
Collaborator Author

@yingsu00 Thanks for the comments, I will address your comments when split these huge PRs to smaller pieces.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants