Skip to content

Conversation

@tharvey5
Copy link

Summary

Currently, Apache Flink's does not support storage partition join, which can lead to unnecessary data shuffles in batch mode. We have implemented the Query Planner changes in Flink already here apache/flink#26715

This feature IS ONLY APPLIED via a config table.optimizer.storage-partition-join-enabled=true o/w there is no impact to current jobs. This PR only supports batch execution mode.

This PR consists of relevant changes for the Flink Iceberg Source. Please note that these changes are ONLY included for FLIP27 (new Source API).

NOTE: We migrated to usnig FLIP27 and have included that backport in this PR Backport: #10832

This PR adds the following support.
- #10832 for Iceberg 1.5.x
- Enhances IcebergTableSource to implement SupportsPartitioning
interface which we defined on the flink
side
which enables Iceberg
to report Partitioning metadata to the Flink Query planner. Done via
outputPartitioning() returning KeyGroupedPartitioning with table’s
partition scheme. It can support various transform types including
bucket, identity, month, day, year.
- Improvements to IcebergSource to support StoragePartitionJoin
- Enhances FlinkSplitPlanner to include a method to group ScanTasks by
groupingKey (Partition Values) which enables us to ensure that all
records within the same partition end up being processed by the same
subtask.
- PartitionAwareSplitAssignment capabilities including a new
PartitionAwareSplitAssignerFactory and PartitionAwareSplitAssigner
which is responsible for ensuring that records with the same partition
are assigned to the same subtask via deterministic assignment
- Includes a newSpecTransformToFlinkTransform to map the various
TransformExpressions used to represent the partitions to the Flink
System

Testing

  • Added Unit tests to TestPartitionAwareSplitAssigner to verify that splits were deterministically applied to the correct subtasks
  • Added Unit Tests TestFlinkSplitPlanner to test improved functionality to get batchSplits based on ScanGroup
  • Added Unit test TestStoragePartitionedJoin to verify that we correctly ensure that we get the correct metadata
  • Run the following Queries internally and compare to Spark output

Correctly use SPJ

select count(*) from iceberg.db.simple_bucketed as table1 join iceberg.db.simple_bucketed_2 as table2 on table1.user_id = table2.user_id2
select count(*) from iceberg.db.user_id_two_partition_cols t1 join iceberg.db.user_id_two_partition_cols_2 t2 on t1.dt = t2.dt and t1.user_id = t2.user_id
select count(*) from iceberg.db.user_id_two_partition_cols t1 join iceberg.db.user_id_two_partition_cols_2 t2 on t1.dt = t2.dt and t1.user_id = t2.user_id  where t1.dt = '2025-05-01' and t2.dt = '2025-05-01'

Correctly cannot use SPJ

select count(*) from iceberg.db.simple_bucketed as table1 join iceberg.db.simple_bucketed_32 as table2 on table1.user_id = table2.user_id2
select count(*) from iceberg.db.user_id_two_partition_cols t1 join iceberg.db.user_id_two_partition_cols_2 t2 on t1.dt = t2.dt and t1.user_id = t2.user_id  where t1.dt = '2025-05-01' and t2.dt = '2025-05-01' and t1.user_id =123 and t2.user_id =123

Tucker Harvey and others added 2 commits June 25, 2025 14:37
…h execution mode

Backport apache#10832

This pr is to Backport infer source parallelism for
[FLIP-27](https://jira.pinadmin.com/browse/FLIP-27) source in batch
execution mode.

Note:
This is not a clean backport.
RowDataConverter is not present in our forked  version. So i had to add it
to make this backport work.
Currently, Apache Flink's does not support storage partition join, which
can lead to unnecessary data shuffles in batch mode. We have implemented
the Query Planner changes in Flink already here apache/flink#26715

This feature **IS ONLY APPLIED via a config**
`table.optimizer.storage-partition-join-enabled=true` o/w there is no
impact to current jobs. This PR only supports batch execution mode.

This PR consists of relevant changes for the Flink Iceberg Source.
Please note that **these changes are ONLY included for FLIP27 (new
Source API)**.

This PR adds the following support.
- Enhances `IcebergTableSource` to implement `SupportsPartitioning`
interface which we defined on the [flink
side](apache/flink#26715) which enables Iceberg
to report Partitioning metadata to the Flink Query planner. Done via
`outputPartitioning()` returning `KeyGroupedPartitioning` with table’s
partition scheme. It can support various transform types including
bucket, identity, month, day, year.
- Improvements to `IcebergSource` to support StoragePartitionJoin
- Enhances `FlinkSplitPlanner` to include a method to group ScanTasks by
groupingKey (Partition Values) which enables us to ensure that all
records within the same partition end up being processed by the same
subtask.
- PartitionAwareSplitAssignment capabilities including a new
`PartitionAwareSplitAssignerFactory` and `PartitionAwareSplitAssigner`
which is responsible for ensuring that records with the same partition
are assigned to the same subtask via deterministic assignment
- Includes a new`SpecTransformToFlinkTransform` to map the various
TransformExpressions used to represent the partitions to the Flink
System

* Added Unit tests to `TestPartitionAwareSplitAssigner` to verify that
splits were deterministically applied to the correct subtasks
* Added Unit Tests `TestFlinkSplitPlanner` to test improved
functionality to get batchSplits based on `ScanGroup`
* Added Unit test `TestStoragePartitionedJoin` to verify that we
correctly ensure that we get the correct metadata
@github-actions github-actions bot added the flink label Jun 25, 2025
@tharvey5 tharvey5 changed the title Storage partition join oss [Improvement] Storage Partition Join Jun 25, 2025
@nastra
Copy link
Contributor

nastra commented Jun 26, 2025

@tharvey5 I see this is targeting 1.5.x. We don't do any new releases of that version and we generally recommend people to upgrade to the latest Iceberg version

@github-actions
Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Jul 27, 2025
@github-actions
Copy link

github-actions bot commented Aug 3, 2025

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Aug 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants